Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6178 Allow upgrades to state serializers
  3. FLINK-6425

Integrate serializer reconfiguration into state restore flow to activate serializer upgrades

    Details

      Description

      With FLINK-6191, TypeSerializer will be reconfigurable.

      From the state backends' point of view, serializer reconfiguration doubles as a mechanism to determine how serializer upgrades should be handled.

      The general idea is that state checkpoints should contain the following as the state's metainfo:

      • the previous serializer
      • snapshot of the previous serializer's configuration

      The upgrade flow is as follows:
      1. On restore, try to deserialize the previous old serializer. Deserialization may fail if a) the serializer no longer exists in classpath, or b) the serializer class is not longer valid (i.e., implementation changed and resulted in different serialVersionUID). In this case, use a dummy serializer as a placeholder. This dummy serializer is currently the ClassNotFoundProxySerializer in the code.

      2. Deserialize the configuration snapshot of the previous old serializer. The configuration snapshot must be successfully deserialized, otherwise the state restore fails.

      3. When we get the new registered serializer for the state (could be a completely new serializer, the same serializer with different implementations, or the exact same serializer untouched; either way they are seen as a new serializer), we use the configuration snapshot of the old serializer to reconfigure the new serializer.

      This completes the upgrade of the old serializer. However, depending on the result of the upgrade, state conversion needs to take place (for now, if state conversion is required, we just fail the job as this functionality isn't available yet). The results could be:

      • Compatible: restore success + serializer upgraded.
      • Compatible, but serialization schema changed: serializer upgraded but requires state conversion, without the requirement that the old serializer needs to be present.
      • Incompatible: serializer upgraded requires state conversion, but requires the old serializer to be present (i.e., can not be the dummy ClassNotFoundProxySerializer).

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/3834

          FLINK-6425 [runtime] Activate serializer upgrades in state backends

          This is a follow-up PR that finalizes serializer upgrades, and is based on #3804 (therefore, only the 2nd and 3rd commits, ed82173 and e77096a is relevant).

          This PR includes the following changes:
          1. Write configuration snapshots of serializers along with checkpoints (this changes serialization format of checkpoints).
          2. On restore, confront configuration snapshots with newly registered serializers using the new `TypeSerializer#getMigrationStrategy(TypeSerializerConfigSnapshot)` method.
          3. Serializer upgrades is completed if the confrontation determines that no migration is needed. The confrontation reconfigures the new serializer if the case requires. If the serializer cannot be reconfigured to avoid state migration, the job simply fails (as we currently do not have the actual state migration feature).

          Note that the confrontation of config snapshots is currently only performed in the `RocksDBKeyedStateBackend`, which is the only place where this is currently needed due to its lazy deserialization characteristic. After we have eager state migration in place, the confrontation should happen for all state backends on restore.

            1. Tests
          • Serialization compatibility of the new checkpoint format is covered with existing tests.
          • Added a test that makes sure `InvalidClassException` is also caught when deserializing old serializers in the checkpoint (which occurs if the old serializer implementation was changed and results in a new serialVersionUID).
          • Added tests for Java serialization failure resilience when reading the new checkpoints, in `SerializerProxiesTest`.
          • Added end-to-end snapshot + restore tests which require reconfiguration of the `KryoSerializer` and `PojoSerializer` in cases where registration order of Kryo classes / Pojo types were changed.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-6425

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3834.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3834


          commit 538a7acecce0d72e36e3726c0df2b6b96be35fc3
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-05-01T13:32:10Z

          FLINK-6190 [core] Migratable TypeSerializers

          This commit introduces the user-facing APIs for migratable
          TypeSerializers. The new user-facing APIs are:

          • new class: TypeSerializerConfigSnapshot
          • new class: ForwardCompatibleSerializationFormatConfig
          • new method: TypeSerializer#snapshotConfiguration()
          • new method: TypeSerializer#reconfigure(TypeSerializerConfigSnapshot)
          • new enum: ReconfigureResult

          commit ed82173fe97c6e9fb0784696bc4c49f10cc4e556
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-05-02T11:35:18Z

          [hotfix] [core] Catch InvalidClassException in TypeSerializerSerializationProxy

          Previously, the TypeSerializerSerializationProxy only uses the dummy
          ClassNotFoundDummyTypeSerializer as a placeholder in the case where the
          user uses a completely new serializer and deletes the old one.

          There is also the case where the user changes the original serializer's
          implementation and results in an InvalidClassException when trying to
          deserialize the serializer. We should also use the
          ClassNotFoundDummyTypeSerializer as a temporary placeholder in this
          case.

          commit e77096af29b4cbea26113928fe93218c075e4035
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-05-06T12:40:58Z

          FLINK-6425 [runtime] Activate serializer upgrades in state backends

          This commit fully activates state serializer upgrades by changing the
          following:

          • Include serializer configuration snapshots in checkpoints
          • On restore, use configuration snapshots to confront new serializers to
            perform the upgrade

          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3834 FLINK-6425 [runtime] Activate serializer upgrades in state backends This is a follow-up PR that finalizes serializer upgrades, and is based on #3804 (therefore, only the 2nd and 3rd commits, ed82173 and e77096a is relevant). This PR includes the following changes: 1. Write configuration snapshots of serializers along with checkpoints (this changes serialization format of checkpoints). 2. On restore, confront configuration snapshots with newly registered serializers using the new `TypeSerializer#getMigrationStrategy(TypeSerializerConfigSnapshot)` method. 3. Serializer upgrades is completed if the confrontation determines that no migration is needed. The confrontation reconfigures the new serializer if the case requires. If the serializer cannot be reconfigured to avoid state migration, the job simply fails (as we currently do not have the actual state migration feature). Note that the confrontation of config snapshots is currently only performed in the `RocksDBKeyedStateBackend`, which is the only place where this is currently needed due to its lazy deserialization characteristic. After we have eager state migration in place, the confrontation should happen for all state backends on restore. Tests Serialization compatibility of the new checkpoint format is covered with existing tests. Added a test that makes sure `InvalidClassException` is also caught when deserializing old serializers in the checkpoint (which occurs if the old serializer implementation was changed and results in a new serialVersionUID). Added tests for Java serialization failure resilience when reading the new checkpoints, in `SerializerProxiesTest`. Added end-to-end snapshot + restore tests which require reconfiguration of the `KryoSerializer` and `PojoSerializer` in cases where registration order of Kryo classes / Pojo types were changed. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6425 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3834.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3834 commit 538a7acecce0d72e36e3726c0df2b6b96be35fc3 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-05-01T13:32:10Z FLINK-6190 [core] Migratable TypeSerializers This commit introduces the user-facing APIs for migratable TypeSerializers. The new user-facing APIs are: new class: TypeSerializerConfigSnapshot new class: ForwardCompatibleSerializationFormatConfig new method: TypeSerializer#snapshotConfiguration() new method: TypeSerializer#reconfigure(TypeSerializerConfigSnapshot) new enum: ReconfigureResult commit ed82173fe97c6e9fb0784696bc4c49f10cc4e556 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-05-02T11:35:18Z [hotfix] [core] Catch InvalidClassException in TypeSerializerSerializationProxy Previously, the TypeSerializerSerializationProxy only uses the dummy ClassNotFoundDummyTypeSerializer as a placeholder in the case where the user uses a completely new serializer and deletes the old one. There is also the case where the user changes the original serializer's implementation and results in an InvalidClassException when trying to deserialize the serializer. We should also use the ClassNotFoundDummyTypeSerializer as a temporary placeholder in this case. commit e77096af29b4cbea26113928fe93218c075e4035 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-05-06T12:40:58Z FLINK-6425 [runtime] Activate serializer upgrades in state backends This commit fully activates state serializer upgrades by changing the following: Include serializer configuration snapshots in checkpoints On restore, use configuration snapshots to confront new serializers to perform the upgrade
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3834#discussion_r115139044

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -1473,22 +1481,92 @@ void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception
          protected <N, S> ColumnFamilyHandle getColumnFamily(
          StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException {

          • Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo =
            + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
            kvStateInformation.get(descriptor.getName());
          • RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>(
          • descriptor.getType(),
          • descriptor.getName(),
          • namespaceSerializer,
          • descriptor.getSerializer());
            + RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
            + descriptor.getType(),
            + descriptor.getName(),
            + namespaceSerializer,
            + descriptor.getSerializer());

          if (stateInfo != null) {

          • if (newMetaInfo.canRestoreFrom(stateInfo.f1)) {
            + // TODO with eager registration in place, these checks should be moved to restore()
            +
            + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo =
            + restoredKvStateMetaInfos.get(descriptor.getName());
            +
            + Preconditions.checkState(
            + newMetaInfo.getName().equals(restoredMetaInfo.getName()),
            + "Incompatible state names. " +
            + "Was [" + restoredMetaInfo.getName() + "], " +
            + "registered with [" + newMetaInfo.getName() + "].");
            +
            + if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
            + && !restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()), + "Incompatible state types. " + + "Was [" + restoredMetaInfo.getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); + }

            +
            + // check serializer migration strategies to determine if state migration is required
            +
            + boolean requireMigration = false;
            +
            + // only check migration strategy if there is a restored configuration snapshot;
            + // there wouldn't be one if we were restored from an older version checkpoint,
            + // in which case we can only simply assume that migration is not required
            +
            + if (restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) {
            + MigrationStrategy<N> namespaceMigrationStrategy = newMetaInfo.getNamespaceSerializer()
            + .getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot());
            +
            + TypeSerializer<N> finalOldNamespaceSerializer;
            + if (namespaceMigrationStrategy.requireMigration())

            Unknown macro: { + requireMigration = true; + + if (namespaceMigrationStrategy.getFallbackDeserializer() != null) { + finalOldNamespaceSerializer = namespaceMigrationStrategy.getFallbackDeserializer(); + } else if (restoredMetaInfo.getNamespaceSerializer() != null + && !(restoredMetaInfo.getNamespaceSerializer() instanceof MigrationNamespaceSerializerProxy)) { + finalOldNamespaceSerializer = restoredMetaInfo.getNamespaceSerializer(); + } else { + throw new RuntimeException( + "State migration required, but there is no available serializer capable of reading previous namespace."); + } + }

            + }
            +
            + if (restoredMetaInfo.getStateSerializerConfigSnapshot() != null) {

              • End diff –

          This almost looks like duplicated code from the previous `if`. Maybe we can create a common helper method to do this and avoid (partial) code duplication.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115139044 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -1473,22 +1481,92 @@ void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception protected <N, S> ColumnFamilyHandle getColumnFamily( StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException { Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo = + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo = kvStateInformation.get(descriptor.getName()); RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>( descriptor.getType(), descriptor.getName(), namespaceSerializer, descriptor.getSerializer()); + RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + descriptor.getType(), + descriptor.getName(), + namespaceSerializer, + descriptor.getSerializer()); if (stateInfo != null) { if (newMetaInfo.canRestoreFrom(stateInfo.f1)) { + // TODO with eager registration in place, these checks should be moved to restore() + + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo = + restoredKvStateMetaInfos.get(descriptor.getName()); + + Preconditions.checkState( + newMetaInfo.getName().equals(restoredMetaInfo.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfo.getName() + "] , " + + "registered with [" + newMetaInfo.getName() + "] ."); + + if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) + && !restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()), + "Incompatible state types. " + + "Was [" + restoredMetaInfo.getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); + } + + // check serializer migration strategies to determine if state migration is required + + boolean requireMigration = false; + + // only check migration strategy if there is a restored configuration snapshot; + // there wouldn't be one if we were restored from an older version checkpoint, + // in which case we can only simply assume that migration is not required + + if (restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) { + MigrationStrategy<N> namespaceMigrationStrategy = newMetaInfo.getNamespaceSerializer() + .getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot()); + + TypeSerializer<N> finalOldNamespaceSerializer; + if (namespaceMigrationStrategy.requireMigration()) Unknown macro: { + requireMigration = true; + + if (namespaceMigrationStrategy.getFallbackDeserializer() != null) { + finalOldNamespaceSerializer = namespaceMigrationStrategy.getFallbackDeserializer(); + } else if (restoredMetaInfo.getNamespaceSerializer() != null + && !(restoredMetaInfo.getNamespaceSerializer() instanceof MigrationNamespaceSerializerProxy)) { + finalOldNamespaceSerializer = restoredMetaInfo.getNamespaceSerializer(); + } else { + throw new RuntimeException( + "State migration required, but there is no available serializer capable of reading previous namespace."); + } + } + } + + if (restoredMetaInfo.getStateSerializerConfigSnapshot() != null) { End diff – This almost looks like duplicated code from the previous `if`. Maybe we can create a common helper method to do this and avoid (partial) code duplication.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3834#discussion_r115139339

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -1473,22 +1481,92 @@ void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception
          protected <N, S> ColumnFamilyHandle getColumnFamily(
          StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException {

          • Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo =
            + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
            kvStateInformation.get(descriptor.getName());
          • RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>(
          • descriptor.getType(),
          • descriptor.getName(),
          • namespaceSerializer,
          • descriptor.getSerializer());
            + RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
            + descriptor.getType(),
            + descriptor.getName(),
            + namespaceSerializer,
            + descriptor.getSerializer());

          if (stateInfo != null) {

          • if (newMetaInfo.canRestoreFrom(stateInfo.f1)) {
            + // TODO with eager registration in place, these checks should be moved to restore()
            +
            + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo =
            + restoredKvStateMetaInfos.get(descriptor.getName());
            +
            + Preconditions.checkState(
            + newMetaInfo.getName().equals(restoredMetaInfo.getName()),
            + "Incompatible state names. " +
            + "Was [" + restoredMetaInfo.getName() + "], " +
            + "registered with [" + newMetaInfo.getName() + "].");
            +
            + if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
            + && !restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()), + "Incompatible state types. " + + "Was [" + restoredMetaInfo.getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); + }

            +
            + // check serializer migration strategies to determine if state migration is required
            +
            + boolean requireMigration = false;
            +
            + // only check migration strategy if there is a restored configuration snapshot;
            + // there wouldn't be one if we were restored from an older version checkpoint,
            + // in which case we can only simply assume that migration is not required
            +
            + if (restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) {
            + MigrationStrategy<N> namespaceMigrationStrategy = newMetaInfo.getNamespaceSerializer()
            + .getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot());
            +
            + TypeSerializer<N> finalOldNamespaceSerializer;
            + if (namespaceMigrationStrategy.requireMigration())

            Unknown macro: { + requireMigration = true; + + if (namespaceMigrationStrategy.getFallbackDeserializer() != null) { + finalOldNamespaceSerializer = namespaceMigrationStrategy.getFallbackDeserializer(); + } else if (restoredMetaInfo.getNamespaceSerializer() != null + && !(restoredMetaInfo.getNamespaceSerializer() instanceof MigrationNamespaceSerializerProxy)) { + finalOldNamespaceSerializer = restoredMetaInfo.getNamespaceSerializer(); + } else { + throw new RuntimeException( + "State migration required, but there is no available serializer capable of reading previous namespace."); + } + }

            + }
            +
            + if (restoredMetaInfo.getStateSerializerConfigSnapshot() != null) {
            + MigrationStrategy<S> stateMigrationStrategy = newMetaInfo.getStateSerializer()
            + .getMigrationStrategyFor(restoredMetaInfo.getStateSerializerConfigSnapshot());
            +
            + TypeSerializer<S> finalOldStateSerializer;
            + if (stateMigrationStrategy.requireMigration()) {
            + requireMigration = true;
            +
            + if (stateMigrationStrategy.getFallbackDeserializer() != null) {

              • End diff –

          This whole `if` is interesting: basically you give the `FallbackDeserializer` priority over the actual old serializer that we could get via Java serialization.
          Originally, I thought the intended flow is: check compatibility between by confronting the user provided serializer with the stored the config. In case they we need to convert, first try to load the former serializer through Java serialization (because this is a safe bet that this class can read the old state if we succeed). If Java deserialization fails, we use the `FallbackSerializer` provided by the new serializer (this should also be a safe bet, except if the implementation is wrong, so overall slightly less save).

          Now here is there question: I think in the serializer you combine the compatibility check and potential creation of the `FallbackSerializer` in a single method, because both methods would partially do similar and duplicated work.
          The downside now is, given the indented flow, we only need to create and use a `FallbackSerializer` if we cannot load the old serializer through Java deserialization.

          I can see that this flow could also work, or we can still use the flow that prioritizes Java serialization and potentially creates an unused `FallbackSerializer`.

          What do you think and is there some point I did not consider that lead to this choice?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115139339 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -1473,22 +1481,92 @@ void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception protected <N, S> ColumnFamilyHandle getColumnFamily( StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException { Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo = + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo = kvStateInformation.get(descriptor.getName()); RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>( descriptor.getType(), descriptor.getName(), namespaceSerializer, descriptor.getSerializer()); + RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + descriptor.getType(), + descriptor.getName(), + namespaceSerializer, + descriptor.getSerializer()); if (stateInfo != null) { if (newMetaInfo.canRestoreFrom(stateInfo.f1)) { + // TODO with eager registration in place, these checks should be moved to restore() + + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo = + restoredKvStateMetaInfos.get(descriptor.getName()); + + Preconditions.checkState( + newMetaInfo.getName().equals(restoredMetaInfo.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfo.getName() + "] , " + + "registered with [" + newMetaInfo.getName() + "] ."); + + if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) + && !restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()), + "Incompatible state types. " + + "Was [" + restoredMetaInfo.getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); + } + + // check serializer migration strategies to determine if state migration is required + + boolean requireMigration = false; + + // only check migration strategy if there is a restored configuration snapshot; + // there wouldn't be one if we were restored from an older version checkpoint, + // in which case we can only simply assume that migration is not required + + if (restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) { + MigrationStrategy<N> namespaceMigrationStrategy = newMetaInfo.getNamespaceSerializer() + .getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot()); + + TypeSerializer<N> finalOldNamespaceSerializer; + if (namespaceMigrationStrategy.requireMigration()) Unknown macro: { + requireMigration = true; + + if (namespaceMigrationStrategy.getFallbackDeserializer() != null) { + finalOldNamespaceSerializer = namespaceMigrationStrategy.getFallbackDeserializer(); + } else if (restoredMetaInfo.getNamespaceSerializer() != null + && !(restoredMetaInfo.getNamespaceSerializer() instanceof MigrationNamespaceSerializerProxy)) { + finalOldNamespaceSerializer = restoredMetaInfo.getNamespaceSerializer(); + } else { + throw new RuntimeException( + "State migration required, but there is no available serializer capable of reading previous namespace."); + } + } + } + + if (restoredMetaInfo.getStateSerializerConfigSnapshot() != null) { + MigrationStrategy<S> stateMigrationStrategy = newMetaInfo.getStateSerializer() + .getMigrationStrategyFor(restoredMetaInfo.getStateSerializerConfigSnapshot()); + + TypeSerializer<S> finalOldStateSerializer; + if (stateMigrationStrategy.requireMigration()) { + requireMigration = true; + + if (stateMigrationStrategy.getFallbackDeserializer() != null) { End diff – This whole `if` is interesting: basically you give the `FallbackDeserializer` priority over the actual old serializer that we could get via Java serialization. Originally, I thought the intended flow is: check compatibility between by confronting the user provided serializer with the stored the config. In case they we need to convert, first try to load the former serializer through Java serialization (because this is a safe bet that this class can read the old state if we succeed). If Java deserialization fails, we use the `FallbackSerializer` provided by the new serializer (this should also be a safe bet, except if the implementation is wrong, so overall slightly less save). Now here is there question: I think in the serializer you combine the compatibility check and potential creation of the `FallbackSerializer` in a single method, because both methods would partially do similar and duplicated work. The downside now is, given the indented flow, we only need to create and use a `FallbackSerializer` if we cannot load the old serializer through Java deserialization. I can see that this flow could also work, or we can still use the flow that prioritizes Java serialization and potentially creates an unused `FallbackSerializer`. What do you think and is there some point I did not consider that lead to this choice?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3834#discussion_r115139379

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/MigrationStrategy.java —
          @@ -0,0 +1,87 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.common.typeutils;
          +
          +import org.apache.flink.annotation.PublicEvolving;
          +
          +/**
          + * A

          {@code MigrationStrategy}

          contains information about how to perform migration of data written
          + * by an older serializer so that new serializers can continue to work on them.
          + *
          + * @param <T> the type of the data being migrated.
          + */
          +@PublicEvolving
          +public final class MigrationStrategy<T> {
          +
          + /** Whether or not migration is required. */
          + private final boolean requiresStateMigration;
          +
          + /**
          + * The fallback deserializer to use, in the case the preceding serializer cannot be found.
          + *
          + * <p>This is only relevant if migration is required.
          + */
          + private final TypeSerializer<T> fallbackDeserializer;
          +
          + /**
          + * Returns a strategy that simply signals that no migration needs to be performed.
          + *
          + * @return a strategy that does not perform migration
          + */
          + public static <T> MigrationStrategy<T> noMigration()

          { + return new MigrationStrategy<>(false, null); + }

          +
          + /**
          + * Returns a strategy that signals migration to be performed, and in the case that the
          + * preceding serializer cannot be found, a provided fallback deserializer can be
          + * used.
          + *
          + * @param fallbackDeserializer a fallback deserializer that can be used to read old data for the migration
          + * in the case that the preceding serializer cannot be found.
          + *
          + * @return a strategy that performs migration with a fallback deserializer to read old data.
          + */
          + public static <T> MigrationStrategy<T> migrateWithFallbackDeserializer(TypeSerializer<T> fallbackDeserializer)

          { + return new MigrationStrategy<>(true, fallbackDeserializer); + }

          +
          + /**
          + * Returns a strategy that signals migration to be performed, without a fallback deserializer.
          + * If the preceding serializer cannot be found, the migration fails because the old data cannot be read.
          + *
          + * @return a strategy that performs migration, without a fallback deserializer.
          + */
          + public static <T> MigrationStrategy<T> migrate() {
          — End diff –

          Maybe this should be named `migrationRequired`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115139379 — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/MigrationStrategy.java — @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * A {@code MigrationStrategy} contains information about how to perform migration of data written + * by an older serializer so that new serializers can continue to work on them. + * + * @param <T> the type of the data being migrated. + */ +@PublicEvolving +public final class MigrationStrategy<T> { + + /** Whether or not migration is required. */ + private final boolean requiresStateMigration; + + /** + * The fallback deserializer to use, in the case the preceding serializer cannot be found. + * + * <p>This is only relevant if migration is required. + */ + private final TypeSerializer<T> fallbackDeserializer; + + /** + * Returns a strategy that simply signals that no migration needs to be performed. + * + * @return a strategy that does not perform migration + */ + public static <T> MigrationStrategy<T> noMigration() { + return new MigrationStrategy<>(false, null); + } + + /** + * Returns a strategy that signals migration to be performed, and in the case that the + * preceding serializer cannot be found, a provided fallback deserializer can be + * used. + * + * @param fallbackDeserializer a fallback deserializer that can be used to read old data for the migration + * in the case that the preceding serializer cannot be found. + * + * @return a strategy that performs migration with a fallback deserializer to read old data. + */ + public static <T> MigrationStrategy<T> migrateWithFallbackDeserializer(TypeSerializer<T> fallbackDeserializer) { + return new MigrationStrategy<>(true, fallbackDeserializer); + } + + /** + * Returns a strategy that signals migration to be performed, without a fallback deserializer. + * If the preceding serializer cannot be found, the migration fails because the old data cannot be read. + * + * @return a strategy that performs migration, without a fallback deserializer. + */ + public static <T> MigrationStrategy<T> migrate() { — End diff – Maybe this should be named `migrationRequired`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3834#discussion_r115139440

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java —
          @@ -161,7 +162,93 @@

          public abstract int hashCode();

          • public boolean canRestoreFrom(TypeSerializer<?> other) {
          • return equals(other);
            + // --------------------------------------------------------------------------------------------
            + // Serializer configuration snapshotting & reconfiguring
            + // --------------------------------------------------------------------------------------------
            +
            + /**
            + * Create a snapshot of the serializer's current configuration to be stored along with the managed state it is
            + * registered to (if any - this method is only relevant if this serializer is registered for serialization of
            + * managed state).
            + *
            + * <p>The configuration snapshot should contain information about the serializer's parameter settings and its
            + * serialization format. When a new serializer is registered to serialize the same managed state that this
            + * serializer was registered to, the returned configuration snapshot can be used to check with the new serializer
            + * if any data migration needs to take place.
            + *
            + * <p>Implementations can also return the singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}
            + * configuration if they guarantee forwards compatibility. For example, implementations that use serialization
            + * frameworks with built-in serialization compatibility, such as <a href=https://thrift.apache.org/>Thrift</a> or
            + * <a href=https://developers.google.com/protocol-buffers/>Protobuf</a>, is suitable for this usage pattern. By
            + * returning the {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}

            , this informs Flink that when managed
            + * state serialized using this serializer is restored, there is no need to check for migration with the new
            + * serializer for the same state. In other words, new serializers are always assumed to be fully compatible for the
            + * serialized state.
            + *
            + * @see TypeSerializerConfigSnapshot
            + * @see ForwardCompatibleSerializationFormatConfig
            + *
            + * @return snapshot of the serializer's current configuration.
            + */
            + public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
            +
            + /**
            + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding
            + * serializer that was registered for serialization of the same managed state (if any - this method is only
            + * relevant if this serializer is registered for serialization of managed state).
            + *
            + * <p>Implementations need to return the resolved migration strategy. The strategy can be one of the following:
            + * <ul>
            + * <li>

            {@link MigrationStrategy#noMigration()}: this signals Flink that this serializer is compatible, or
            + * has been reconfigured to be compatible, to continue reading old data, and that the
            + * serialization schema remains the same. No migration needs to be performed.</li>
            + *
            + * <li>{@link MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)}: this signals Flink that
            + * migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be
            + * compatible, for old data. Furthermore, in the case that the preceding serializer cannot be found or
            + * restored to read the old data, the provided fallback deserializer can be used.</li>
            + *
            + * <li>{@link MigrationStrategy#migrate()}: this signals Flink that migration needs to be performed, because
            + * this serializer is not compatible, or cannot be reconfigured to be compatible, for old data.</li>
            + * </ul>
            + *
            + * <p>This method is guaranteed to only be invoked if the preceding serializer's configuration snapshot is not the
            + * singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration. In such cases, Flink always
            + * assume that the migration strategy is {@link MigrationStrategy#migrate()}.
            + *
            + * @see MigrationStrategy
            + *
            + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state
            + *
            + * @return the result of the reconfiguration.
            + */
            + protected abstract MigrationStrategy<T> getMigrationStrategy(TypeSerializerConfigSnapshot configSnapshot);
            +
            + /**
            + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding
            + * serializer that was registered for serialization of the same managed state (if any - this method is only
            + * relevant if this serializer is registered for serialization of managed state).
            + *
            + * <p>This method is not part of the public user-facing API, and cannot be overriden. External operations
            + * providing a configuration snapshot of preceding serializer can only do so through this method.
            + *
            + * <p>This method always assumes that the migration strategy is {@link MigrationStrategy#noMigration()}

            if
            + * the provided configuration snapshot is the singleton

            {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}

            .
            + * Otherwise, the configuration snapshot is provided to the actual
            + *

            {@link #getMigrationStrategy(TypeSerializerConfigSnapshot)}

            (TypeSerializerConfigSnapshot)} implementation.
            + *
            + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state
            + *
            + * @return the result of the reconfiguration.
            + */
            + @Internal
            + public final MigrationStrategy<T> getMigrationStrategyFor(TypeSerializerConfigSnapshot configSnapshot) {
            + // reference equality is viable here, because the forward compatible
            + // marker config will always be explicitly restored with the singleton instance
            + if (configSnapshot != ForwardCompatibleSerializationFormatConfig.INSTANCE) {

              • End diff –

          I see the intention, but I think it is not a good idea to have this `final`default implementation around a `ForwardCompatibleSerializationFormatConfig`.

          Imagine a case where the old version of a serializer is `ForwardCompatible` and the new serializer which replaces it is not. Then this will always bypass the check, even if it shouldn't.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115139440 — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java — @@ -161,7 +162,93 @@ public abstract int hashCode(); public boolean canRestoreFrom(TypeSerializer<?> other) { return equals(other); + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & reconfiguring + // -------------------------------------------------------------------------------------------- + + /** + * Create a snapshot of the serializer's current configuration to be stored along with the managed state it is + * registered to (if any - this method is only relevant if this serializer is registered for serialization of + * managed state). + * + * <p>The configuration snapshot should contain information about the serializer's parameter settings and its + * serialization format. When a new serializer is registered to serialize the same managed state that this + * serializer was registered to, the returned configuration snapshot can be used to check with the new serializer + * if any data migration needs to take place. + * + * <p>Implementations can also return the singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} + * configuration if they guarantee forwards compatibility. For example, implementations that use serialization + * frameworks with built-in serialization compatibility, such as <a href= https://thrift.apache.org/ >Thrift</a> or + * <a href= https://developers.google.com/protocol-buffers/ >Protobuf</a>, is suitable for this usage pattern. By + * returning the {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} , this informs Flink that when managed + * state serialized using this serializer is restored, there is no need to check for migration with the new + * serializer for the same state. In other words, new serializers are always assumed to be fully compatible for the + * serialized state. + * + * @see TypeSerializerConfigSnapshot + * @see ForwardCompatibleSerializationFormatConfig + * + * @return snapshot of the serializer's current configuration. + */ + public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); + + /** + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding + * serializer that was registered for serialization of the same managed state (if any - this method is only + * relevant if this serializer is registered for serialization of managed state). + * + * <p>Implementations need to return the resolved migration strategy. The strategy can be one of the following: + * <ul> + * <li> {@link MigrationStrategy#noMigration()}: this signals Flink that this serializer is compatible, or + * has been reconfigured to be compatible, to continue reading old data, and that the + * serialization schema remains the same. No migration needs to be performed.</li> + * + * <li>{@link MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)}: this signals Flink that + * migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be + * compatible, for old data. Furthermore, in the case that the preceding serializer cannot be found or + * restored to read the old data, the provided fallback deserializer can be used.</li> + * + * <li>{@link MigrationStrategy#migrate()}: this signals Flink that migration needs to be performed, because + * this serializer is not compatible, or cannot be reconfigured to be compatible, for old data.</li> + * </ul> + * + * <p>This method is guaranteed to only be invoked if the preceding serializer's configuration snapshot is not the + * singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration. In such cases, Flink always + * assume that the migration strategy is {@link MigrationStrategy#migrate()}. + * + * @see MigrationStrategy + * + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state + * + * @return the result of the reconfiguration. + */ + protected abstract MigrationStrategy<T> getMigrationStrategy(TypeSerializerConfigSnapshot configSnapshot); + + /** + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding + * serializer that was registered for serialization of the same managed state (if any - this method is only + * relevant if this serializer is registered for serialization of managed state). + * + * <p>This method is not part of the public user-facing API, and cannot be overriden. External operations + * providing a configuration snapshot of preceding serializer can only do so through this method. + * + * <p>This method always assumes that the migration strategy is {@link MigrationStrategy#noMigration()} if + * the provided configuration snapshot is the singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} . + * Otherwise, the configuration snapshot is provided to the actual + * {@link #getMigrationStrategy(TypeSerializerConfigSnapshot)} (TypeSerializerConfigSnapshot)} implementation. + * + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state + * + * @return the result of the reconfiguration. + */ + @Internal + public final MigrationStrategy<T> getMigrationStrategyFor(TypeSerializerConfigSnapshot configSnapshot) { + // reference equality is viable here, because the forward compatible + // marker config will always be explicitly restored with the singleton instance + if (configSnapshot != ForwardCompatibleSerializationFormatConfig.INSTANCE) { End diff – I see the intention, but I think it is not a good idea to have this `final`default implementation around a `ForwardCompatibleSerializationFormatConfig`. Imagine a case where the old version of a serializer is `ForwardCompatible` and the new serializer which replaces it is not. Then this will always bypass the check, even if it shouldn't.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3834#discussion_r115139530

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java —
          @@ -161,7 +162,93 @@

          public abstract int hashCode();

          • public boolean canRestoreFrom(TypeSerializer<?> other) {
          • return equals(other);
            + // --------------------------------------------------------------------------------------------
            + // Serializer configuration snapshotting & reconfiguring
            + // --------------------------------------------------------------------------------------------
            +
            + /**
            + * Create a snapshot of the serializer's current configuration to be stored along with the managed state it is
            + * registered to (if any - this method is only relevant if this serializer is registered for serialization of
            + * managed state).
            + *
            + * <p>The configuration snapshot should contain information about the serializer's parameter settings and its
            + * serialization format. When a new serializer is registered to serialize the same managed state that this
            + * serializer was registered to, the returned configuration snapshot can be used to check with the new serializer
            + * if any data migration needs to take place.
            + *
            + * <p>Implementations can also return the singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}
            + * configuration if they guarantee forwards compatibility. For example, implementations that use serialization
            + * frameworks with built-in serialization compatibility, such as <a href=https://thrift.apache.org/>Thrift</a> or
            + * <a href=https://developers.google.com/protocol-buffers/>Protobuf</a>, is suitable for this usage pattern. By
            + * returning the {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}

            , this informs Flink that when managed
            + * state serialized using this serializer is restored, there is no need to check for migration with the new
            + * serializer for the same state. In other words, new serializers are always assumed to be fully compatible for the
            + * serialized state.
            + *
            + * @see TypeSerializerConfigSnapshot
            + * @see ForwardCompatibleSerializationFormatConfig
            + *
            + * @return snapshot of the serializer's current configuration.
            + */
            + public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
            +
            + /**
            + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding
            + * serializer that was registered for serialization of the same managed state (if any - this method is only
            + * relevant if this serializer is registered for serialization of managed state).
            + *
            + * <p>Implementations need to return the resolved migration strategy. The strategy can be one of the following:
            + * <ul>
            + * <li>

            {@link MigrationStrategy#noMigration()}

            : this signals Flink that this serializer is compatible, or
            + * has been reconfigured to be compatible, to continue reading old data, and that the
            + * serialization schema remains the same. No migration needs to be performed.</li>
            + *
            + * <li>

            {@link MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)}

            : this signals Flink that
            + * migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be
            + * compatible, for old data. Furthermore, in the case that the preceding serializer cannot be found or
            + * restored to read the old data, the provided fallback deserializer can be used.</li>
            + *
            + * <li>

            {@link MigrationStrategy#migrate()}: this signals Flink that migration needs to be performed, because
            + * this serializer is not compatible, or cannot be reconfigured to be compatible, for old data.</li>
            + * </ul>
            + *
            + * <p>This method is guaranteed to only be invoked if the preceding serializer's configuration snapshot is not the
            + * singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration. In such cases, Flink always
            + * assume that the migration strategy is {@link MigrationStrategy#migrate()}

            .
            + *
            + * @see MigrationStrategy
            + *
            + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state
            + *
            + * @return the result of the reconfiguration.
            + */
            + protected abstract MigrationStrategy<T> getMigrationStrategy(TypeSerializerConfigSnapshot configSnapshot);

              • End diff –

          This method is currently doing (up to) 3 things: (maybe) reconfigure the serializer, give advise about if we need to convert, and (maybe) provide a backwards compatibe serializer. Maybe this can be told as a high level overview in the beginning of the doc, then all the details.

          From this, I think also a name like `ensureCompatibility` returning a `CompatibilityResult` could be a better fit?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115139530 — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java — @@ -161,7 +162,93 @@ public abstract int hashCode(); public boolean canRestoreFrom(TypeSerializer<?> other) { return equals(other); + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & reconfiguring + // -------------------------------------------------------------------------------------------- + + /** + * Create a snapshot of the serializer's current configuration to be stored along with the managed state it is + * registered to (if any - this method is only relevant if this serializer is registered for serialization of + * managed state). + * + * <p>The configuration snapshot should contain information about the serializer's parameter settings and its + * serialization format. When a new serializer is registered to serialize the same managed state that this + * serializer was registered to, the returned configuration snapshot can be used to check with the new serializer + * if any data migration needs to take place. + * + * <p>Implementations can also return the singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} + * configuration if they guarantee forwards compatibility. For example, implementations that use serialization + * frameworks with built-in serialization compatibility, such as <a href= https://thrift.apache.org/ >Thrift</a> or + * <a href= https://developers.google.com/protocol-buffers/ >Protobuf</a>, is suitable for this usage pattern. By + * returning the {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} , this informs Flink that when managed + * state serialized using this serializer is restored, there is no need to check for migration with the new + * serializer for the same state. In other words, new serializers are always assumed to be fully compatible for the + * serialized state. + * + * @see TypeSerializerConfigSnapshot + * @see ForwardCompatibleSerializationFormatConfig + * + * @return snapshot of the serializer's current configuration. + */ + public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); + + /** + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding + * serializer that was registered for serialization of the same managed state (if any - this method is only + * relevant if this serializer is registered for serialization of managed state). + * + * <p>Implementations need to return the resolved migration strategy. The strategy can be one of the following: + * <ul> + * <li> {@link MigrationStrategy#noMigration()} : this signals Flink that this serializer is compatible, or + * has been reconfigured to be compatible, to continue reading old data, and that the + * serialization schema remains the same. No migration needs to be performed.</li> + * + * <li> {@link MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)} : this signals Flink that + * migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be + * compatible, for old data. Furthermore, in the case that the preceding serializer cannot be found or + * restored to read the old data, the provided fallback deserializer can be used.</li> + * + * <li> {@link MigrationStrategy#migrate()}: this signals Flink that migration needs to be performed, because + * this serializer is not compatible, or cannot be reconfigured to be compatible, for old data.</li> + * </ul> + * + * <p>This method is guaranteed to only be invoked if the preceding serializer's configuration snapshot is not the + * singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration. In such cases, Flink always + * assume that the migration strategy is {@link MigrationStrategy#migrate()} . + * + * @see MigrationStrategy + * + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state + * + * @return the result of the reconfiguration. + */ + protected abstract MigrationStrategy<T> getMigrationStrategy(TypeSerializerConfigSnapshot configSnapshot); End diff – This method is currently doing (up to) 3 things: (maybe) reconfigure the serializer, give advise about if we need to convert, and (maybe) provide a backwards compatibe serializer. Maybe this can be told as a high level overview in the beginning of the doc, then all the details. From this, I think also a name like `ensureCompatibility` returning a `CompatibilityResult` could be a better fit?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3834#discussion_r115141541

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -1473,22 +1481,92 @@ void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception
          protected <N, S> ColumnFamilyHandle getColumnFamily(
          StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException {

          • Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo =
            + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
            kvStateInformation.get(descriptor.getName());
          • RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>(
          • descriptor.getType(),
          • descriptor.getName(),
          • namespaceSerializer,
          • descriptor.getSerializer());
            + RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
            + descriptor.getType(),
            + descriptor.getName(),
            + namespaceSerializer,
            + descriptor.getSerializer());

          if (stateInfo != null) {

          • if (newMetaInfo.canRestoreFrom(stateInfo.f1)) {
            + // TODO with eager registration in place, these checks should be moved to restore()
            +
            + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo =
            + restoredKvStateMetaInfos.get(descriptor.getName());
            +
            + Preconditions.checkState(
            + newMetaInfo.getName().equals(restoredMetaInfo.getName()),
            + "Incompatible state names. " +
            + "Was [" + restoredMetaInfo.getName() + "], " +
            + "registered with [" + newMetaInfo.getName() + "].");
            +
            + if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
            + && !restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()), + "Incompatible state types. " + + "Was [" + restoredMetaInfo.getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); + }

            +
            + // check serializer migration strategies to determine if state migration is required
            +
            + boolean requireMigration = false;
            +
            + // only check migration strategy if there is a restored configuration snapshot;
            + // there wouldn't be one if we were restored from an older version checkpoint,
            + // in which case we can only simply assume that migration is not required
            +
            + if (restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) {
            + MigrationStrategy<N> namespaceMigrationStrategy = newMetaInfo.getNamespaceSerializer()
            + .getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot());
            +
            + TypeSerializer<N> finalOldNamespaceSerializer;
            + if (namespaceMigrationStrategy.requireMigration())

            Unknown macro: { + requireMigration = true; + + if (namespaceMigrationStrategy.getFallbackDeserializer() != null) { + finalOldNamespaceSerializer = namespaceMigrationStrategy.getFallbackDeserializer(); + } else if (restoredMetaInfo.getNamespaceSerializer() != null + && !(restoredMetaInfo.getNamespaceSerializer() instanceof MigrationNamespaceSerializerProxy)) { + finalOldNamespaceSerializer = restoredMetaInfo.getNamespaceSerializer(); + } else { + throw new RuntimeException( + "State migration required, but there is no available serializer capable of reading previous namespace."); + } + }

            + }
            +
            + if (restoredMetaInfo.getStateSerializerConfigSnapshot() != null) {
            + MigrationStrategy<S> stateMigrationStrategy = newMetaInfo.getStateSerializer()
            + .getMigrationStrategyFor(restoredMetaInfo.getStateSerializerConfigSnapshot());
            +
            + TypeSerializer<S> finalOldStateSerializer;
            + if (stateMigrationStrategy.requireMigration()) {
            + requireMigration = true;
            +
            + if (stateMigrationStrategy.getFallbackDeserializer() != null) {

              • End diff –

          You're correct with the setting that the fallback serializer has higher priority, in the current PR. But I agree that logically, the old Java serialized serializer should be a safer bet and should have higher priority.

          I think it would be a better balance to go with the flow that prioritizes the Java serialized serializer, but may just potentially create an unused fallback serializer. It is a "fallback", anyway, and with this flow the user implementation can be more compact (as you noticed, there a lot of duplicate logic for the user to implement if we split this up into 2 methods).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115141541 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -1473,22 +1481,92 @@ void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception protected <N, S> ColumnFamilyHandle getColumnFamily( StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException { Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo = + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo = kvStateInformation.get(descriptor.getName()); RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>( descriptor.getType(), descriptor.getName(), namespaceSerializer, descriptor.getSerializer()); + RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + descriptor.getType(), + descriptor.getName(), + namespaceSerializer, + descriptor.getSerializer()); if (stateInfo != null) { if (newMetaInfo.canRestoreFrom(stateInfo.f1)) { + // TODO with eager registration in place, these checks should be moved to restore() + + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo = + restoredKvStateMetaInfos.get(descriptor.getName()); + + Preconditions.checkState( + newMetaInfo.getName().equals(restoredMetaInfo.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfo.getName() + "] , " + + "registered with [" + newMetaInfo.getName() + "] ."); + + if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) + && !restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()), + "Incompatible state types. " + + "Was [" + restoredMetaInfo.getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); + } + + // check serializer migration strategies to determine if state migration is required + + boolean requireMigration = false; + + // only check migration strategy if there is a restored configuration snapshot; + // there wouldn't be one if we were restored from an older version checkpoint, + // in which case we can only simply assume that migration is not required + + if (restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) { + MigrationStrategy<N> namespaceMigrationStrategy = newMetaInfo.getNamespaceSerializer() + .getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot()); + + TypeSerializer<N> finalOldNamespaceSerializer; + if (namespaceMigrationStrategy.requireMigration()) Unknown macro: { + requireMigration = true; + + if (namespaceMigrationStrategy.getFallbackDeserializer() != null) { + finalOldNamespaceSerializer = namespaceMigrationStrategy.getFallbackDeserializer(); + } else if (restoredMetaInfo.getNamespaceSerializer() != null + && !(restoredMetaInfo.getNamespaceSerializer() instanceof MigrationNamespaceSerializerProxy)) { + finalOldNamespaceSerializer = restoredMetaInfo.getNamespaceSerializer(); + } else { + throw new RuntimeException( + "State migration required, but there is no available serializer capable of reading previous namespace."); + } + } + } + + if (restoredMetaInfo.getStateSerializerConfigSnapshot() != null) { + MigrationStrategy<S> stateMigrationStrategy = newMetaInfo.getStateSerializer() + .getMigrationStrategyFor(restoredMetaInfo.getStateSerializerConfigSnapshot()); + + TypeSerializer<S> finalOldStateSerializer; + if (stateMigrationStrategy.requireMigration()) { + requireMigration = true; + + if (stateMigrationStrategy.getFallbackDeserializer() != null) { End diff – You're correct with the setting that the fallback serializer has higher priority, in the current PR. But I agree that logically, the old Java serialized serializer should be a safer bet and should have higher priority. I think it would be a better balance to go with the flow that prioritizes the Java serialized serializer, but may just potentially create an unused fallback serializer. It is a "fallback", anyway, and with this flow the user implementation can be more compact (as you noticed, there a lot of duplicate logic for the user to implement if we split this up into 2 methods).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3834#discussion_r115141548

          — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java —
          @@ -1473,22 +1481,92 @@ void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception
          protected <N, S> ColumnFamilyHandle getColumnFamily(
          StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException {

          • Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo =
            + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
            kvStateInformation.get(descriptor.getName());
          • RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>(
          • descriptor.getType(),
          • descriptor.getName(),
          • namespaceSerializer,
          • descriptor.getSerializer());
            + RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
            + descriptor.getType(),
            + descriptor.getName(),
            + namespaceSerializer,
            + descriptor.getSerializer());

          if (stateInfo != null) {

          • if (newMetaInfo.canRestoreFrom(stateInfo.f1)) {
            + // TODO with eager registration in place, these checks should be moved to restore()
            +
            + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo =
            + restoredKvStateMetaInfos.get(descriptor.getName());
            +
            + Preconditions.checkState(
            + newMetaInfo.getName().equals(restoredMetaInfo.getName()),
            + "Incompatible state names. " +
            + "Was [" + restoredMetaInfo.getName() + "], " +
            + "registered with [" + newMetaInfo.getName() + "].");
            +
            + if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
            + && !restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()), + "Incompatible state types. " + + "Was [" + restoredMetaInfo.getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); + }

            +
            + // check serializer migration strategies to determine if state migration is required
            +
            + boolean requireMigration = false;
            +
            + // only check migration strategy if there is a restored configuration snapshot;
            + // there wouldn't be one if we were restored from an older version checkpoint,
            + // in which case we can only simply assume that migration is not required
            +
            + if (restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) {
            + MigrationStrategy<N> namespaceMigrationStrategy = newMetaInfo.getNamespaceSerializer()
            + .getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot());
            +
            + TypeSerializer<N> finalOldNamespaceSerializer;
            + if (namespaceMigrationStrategy.requireMigration())

            Unknown macro: { + requireMigration = true; + + if (namespaceMigrationStrategy.getFallbackDeserializer() != null) { + finalOldNamespaceSerializer = namespaceMigrationStrategy.getFallbackDeserializer(); + } else if (restoredMetaInfo.getNamespaceSerializer() != null + && !(restoredMetaInfo.getNamespaceSerializer() instanceof MigrationNamespaceSerializerProxy)) { + finalOldNamespaceSerializer = restoredMetaInfo.getNamespaceSerializer(); + } else { + throw new RuntimeException( + "State migration required, but there is no available serializer capable of reading previous namespace."); + } + }

            + }
            +
            + if (restoredMetaInfo.getStateSerializerConfigSnapshot() != null) {
            + MigrationStrategy<S> stateMigrationStrategy = newMetaInfo.getStateSerializer()
            + .getMigrationStrategyFor(restoredMetaInfo.getStateSerializerConfigSnapshot());
            +
            + TypeSerializer<S> finalOldStateSerializer;
            + if (stateMigrationStrategy.requireMigration()) {
            + requireMigration = true;
            +
            + if (stateMigrationStrategy.getFallbackDeserializer() != null) {

              • End diff –

          I'll change this to the suggested new flow

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115141548 — Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java — @@ -1473,22 +1481,92 @@ void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception protected <N, S> ColumnFamilyHandle getColumnFamily( StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException { Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo = + Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo = kvStateInformation.get(descriptor.getName()); RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>( descriptor.getType(), descriptor.getName(), namespaceSerializer, descriptor.getSerializer()); + RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + descriptor.getType(), + descriptor.getName(), + namespaceSerializer, + descriptor.getSerializer()); if (stateInfo != null) { if (newMetaInfo.canRestoreFrom(stateInfo.f1)) { + // TODO with eager registration in place, these checks should be moved to restore() + + RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo = + restoredKvStateMetaInfos.get(descriptor.getName()); + + Preconditions.checkState( + newMetaInfo.getName().equals(restoredMetaInfo.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfo.getName() + "] , " + + "registered with [" + newMetaInfo.getName() + "] ."); + + if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN) + && !restoredMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + newMetaInfo.getStateType().equals(restoredMetaInfo.getStateType()), + "Incompatible state types. " + + "Was [" + restoredMetaInfo.getStateType() + "], " + + "registered with [" + newMetaInfo.getStateType() + "]."); + } + + // check serializer migration strategies to determine if state migration is required + + boolean requireMigration = false; + + // only check migration strategy if there is a restored configuration snapshot; + // there wouldn't be one if we were restored from an older version checkpoint, + // in which case we can only simply assume that migration is not required + + if (restoredMetaInfo.getNamespaceSerializerConfigSnapshot() != null) { + MigrationStrategy<N> namespaceMigrationStrategy = newMetaInfo.getNamespaceSerializer() + .getMigrationStrategyFor(restoredMetaInfo.getNamespaceSerializerConfigSnapshot()); + + TypeSerializer<N> finalOldNamespaceSerializer; + if (namespaceMigrationStrategy.requireMigration()) Unknown macro: { + requireMigration = true; + + if (namespaceMigrationStrategy.getFallbackDeserializer() != null) { + finalOldNamespaceSerializer = namespaceMigrationStrategy.getFallbackDeserializer(); + } else if (restoredMetaInfo.getNamespaceSerializer() != null + && !(restoredMetaInfo.getNamespaceSerializer() instanceof MigrationNamespaceSerializerProxy)) { + finalOldNamespaceSerializer = restoredMetaInfo.getNamespaceSerializer(); + } else { + throw new RuntimeException( + "State migration required, but there is no available serializer capable of reading previous namespace."); + } + } + } + + if (restoredMetaInfo.getStateSerializerConfigSnapshot() != null) { + MigrationStrategy<S> stateMigrationStrategy = newMetaInfo.getStateSerializer() + .getMigrationStrategyFor(restoredMetaInfo.getStateSerializerConfigSnapshot()); + + TypeSerializer<S> finalOldStateSerializer; + if (stateMigrationStrategy.requireMigration()) { + requireMigration = true; + + if (stateMigrationStrategy.getFallbackDeserializer() != null) { End diff – I'll change this to the suggested new flow
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3834#discussion_r115141563

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java —
          @@ -161,7 +162,93 @@

          public abstract int hashCode();

          • public boolean canRestoreFrom(TypeSerializer<?> other) {
          • return equals(other);
            + // --------------------------------------------------------------------------------------------
            + // Serializer configuration snapshotting & reconfiguring
            + // --------------------------------------------------------------------------------------------
            +
            + /**
            + * Create a snapshot of the serializer's current configuration to be stored along with the managed state it is
            + * registered to (if any - this method is only relevant if this serializer is registered for serialization of
            + * managed state).
            + *
            + * <p>The configuration snapshot should contain information about the serializer's parameter settings and its
            + * serialization format. When a new serializer is registered to serialize the same managed state that this
            + * serializer was registered to, the returned configuration snapshot can be used to check with the new serializer
            + * if any data migration needs to take place.
            + *
            + * <p>Implementations can also return the singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}
            + * configuration if they guarantee forwards compatibility. For example, implementations that use serialization
            + * frameworks with built-in serialization compatibility, such as <a href=https://thrift.apache.org/>Thrift</a> or
            + * <a href=https://developers.google.com/protocol-buffers/>Protobuf</a>, is suitable for this usage pattern. By
            + * returning the {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}

            , this informs Flink that when managed
            + * state serialized using this serializer is restored, there is no need to check for migration with the new
            + * serializer for the same state. In other words, new serializers are always assumed to be fully compatible for the
            + * serialized state.
            + *
            + * @see TypeSerializerConfigSnapshot
            + * @see ForwardCompatibleSerializationFormatConfig
            + *
            + * @return snapshot of the serializer's current configuration.
            + */
            + public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
            +
            + /**
            + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding
            + * serializer that was registered for serialization of the same managed state (if any - this method is only
            + * relevant if this serializer is registered for serialization of managed state).
            + *
            + * <p>Implementations need to return the resolved migration strategy. The strategy can be one of the following:
            + * <ul>
            + * <li>

            {@link MigrationStrategy#noMigration()}: this signals Flink that this serializer is compatible, or
            + * has been reconfigured to be compatible, to continue reading old data, and that the
            + * serialization schema remains the same. No migration needs to be performed.</li>
            + *
            + * <li>{@link MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)}: this signals Flink that
            + * migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be
            + * compatible, for old data. Furthermore, in the case that the preceding serializer cannot be found or
            + * restored to read the old data, the provided fallback deserializer can be used.</li>
            + *
            + * <li>{@link MigrationStrategy#migrate()}: this signals Flink that migration needs to be performed, because
            + * this serializer is not compatible, or cannot be reconfigured to be compatible, for old data.</li>
            + * </ul>
            + *
            + * <p>This method is guaranteed to only be invoked if the preceding serializer's configuration snapshot is not the
            + * singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration. In such cases, Flink always
            + * assume that the migration strategy is {@link MigrationStrategy#migrate()}.
            + *
            + * @see MigrationStrategy
            + *
            + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state
            + *
            + * @return the result of the reconfiguration.
            + */
            + protected abstract MigrationStrategy<T> getMigrationStrategy(TypeSerializerConfigSnapshot configSnapshot);
            +
            + /**
            + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding
            + * serializer that was registered for serialization of the same managed state (if any - this method is only
            + * relevant if this serializer is registered for serialization of managed state).
            + *
            + * <p>This method is not part of the public user-facing API, and cannot be overriden. External operations
            + * providing a configuration snapshot of preceding serializer can only do so through this method.
            + *
            + * <p>This method always assumes that the migration strategy is {@link MigrationStrategy#noMigration()}

            if
            + * the provided configuration snapshot is the singleton

            {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}

            .
            + * Otherwise, the configuration snapshot is provided to the actual
            + *

            {@link #getMigrationStrategy(TypeSerializerConfigSnapshot)}

            (TypeSerializerConfigSnapshot)} implementation.
            + *
            + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state
            + *
            + * @return the result of the reconfiguration.
            + */
            + @Internal
            + public final MigrationStrategy<T> getMigrationStrategyFor(TypeSerializerConfigSnapshot configSnapshot) {
            + // reference equality is viable here, because the forward compatible
            + // marker config will always be explicitly restored with the singleton instance
            + if (configSnapshot != ForwardCompatibleSerializationFormatConfig.INSTANCE) {

              • End diff –

          I agree. I've removed the shortcut and have only a single public abstract method now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115141563 — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java — @@ -161,7 +162,93 @@ public abstract int hashCode(); public boolean canRestoreFrom(TypeSerializer<?> other) { return equals(other); + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & reconfiguring + // -------------------------------------------------------------------------------------------- + + /** + * Create a snapshot of the serializer's current configuration to be stored along with the managed state it is + * registered to (if any - this method is only relevant if this serializer is registered for serialization of + * managed state). + * + * <p>The configuration snapshot should contain information about the serializer's parameter settings and its + * serialization format. When a new serializer is registered to serialize the same managed state that this + * serializer was registered to, the returned configuration snapshot can be used to check with the new serializer + * if any data migration needs to take place. + * + * <p>Implementations can also return the singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} + * configuration if they guarantee forwards compatibility. For example, implementations that use serialization + * frameworks with built-in serialization compatibility, such as <a href= https://thrift.apache.org/ >Thrift</a> or + * <a href= https://developers.google.com/protocol-buffers/ >Protobuf</a>, is suitable for this usage pattern. By + * returning the {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} , this informs Flink that when managed + * state serialized using this serializer is restored, there is no need to check for migration with the new + * serializer for the same state. In other words, new serializers are always assumed to be fully compatible for the + * serialized state. + * + * @see TypeSerializerConfigSnapshot + * @see ForwardCompatibleSerializationFormatConfig + * + * @return snapshot of the serializer's current configuration. + */ + public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); + + /** + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding + * serializer that was registered for serialization of the same managed state (if any - this method is only + * relevant if this serializer is registered for serialization of managed state). + * + * <p>Implementations need to return the resolved migration strategy. The strategy can be one of the following: + * <ul> + * <li> {@link MigrationStrategy#noMigration()}: this signals Flink that this serializer is compatible, or + * has been reconfigured to be compatible, to continue reading old data, and that the + * serialization schema remains the same. No migration needs to be performed.</li> + * + * <li>{@link MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)}: this signals Flink that + * migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be + * compatible, for old data. Furthermore, in the case that the preceding serializer cannot be found or + * restored to read the old data, the provided fallback deserializer can be used.</li> + * + * <li>{@link MigrationStrategy#migrate()}: this signals Flink that migration needs to be performed, because + * this serializer is not compatible, or cannot be reconfigured to be compatible, for old data.</li> + * </ul> + * + * <p>This method is guaranteed to only be invoked if the preceding serializer's configuration snapshot is not the + * singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration. In such cases, Flink always + * assume that the migration strategy is {@link MigrationStrategy#migrate()}. + * + * @see MigrationStrategy + * + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state + * + * @return the result of the reconfiguration. + */ + protected abstract MigrationStrategy<T> getMigrationStrategy(TypeSerializerConfigSnapshot configSnapshot); + + /** + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding + * serializer that was registered for serialization of the same managed state (if any - this method is only + * relevant if this serializer is registered for serialization of managed state). + * + * <p>This method is not part of the public user-facing API, and cannot be overriden. External operations + * providing a configuration snapshot of preceding serializer can only do so through this method. + * + * <p>This method always assumes that the migration strategy is {@link MigrationStrategy#noMigration()} if + * the provided configuration snapshot is the singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} . + * Otherwise, the configuration snapshot is provided to the actual + * {@link #getMigrationStrategy(TypeSerializerConfigSnapshot)} (TypeSerializerConfigSnapshot)} implementation. + * + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state + * + * @return the result of the reconfiguration. + */ + @Internal + public final MigrationStrategy<T> getMigrationStrategyFor(TypeSerializerConfigSnapshot configSnapshot) { + // reference equality is viable here, because the forward compatible + // marker config will always be explicitly restored with the singleton instance + if (configSnapshot != ForwardCompatibleSerializationFormatConfig.INSTANCE) { End diff – I agree. I've removed the shortcut and have only a single public abstract method now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3834#discussion_r115141585

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java —
          @@ -161,7 +162,93 @@

          public abstract int hashCode();

          • public boolean canRestoreFrom(TypeSerializer<?> other) {
          • return equals(other);
            + // --------------------------------------------------------------------------------------------
            + // Serializer configuration snapshotting & reconfiguring
            + // --------------------------------------------------------------------------------------------
            +
            + /**
            + * Create a snapshot of the serializer's current configuration to be stored along with the managed state it is
            + * registered to (if any - this method is only relevant if this serializer is registered for serialization of
            + * managed state).
            + *
            + * <p>The configuration snapshot should contain information about the serializer's parameter settings and its
            + * serialization format. When a new serializer is registered to serialize the same managed state that this
            + * serializer was registered to, the returned configuration snapshot can be used to check with the new serializer
            + * if any data migration needs to take place.
            + *
            + * <p>Implementations can also return the singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}
            + * configuration if they guarantee forwards compatibility. For example, implementations that use serialization
            + * frameworks with built-in serialization compatibility, such as <a href=https://thrift.apache.org/>Thrift</a> or
            + * <a href=https://developers.google.com/protocol-buffers/>Protobuf</a>, is suitable for this usage pattern. By
            + * returning the {@link ForwardCompatibleSerializationFormatConfig#INSTANCE}

            , this informs Flink that when managed
            + * state serialized using this serializer is restored, there is no need to check for migration with the new
            + * serializer for the same state. In other words, new serializers are always assumed to be fully compatible for the
            + * serialized state.
            + *
            + * @see TypeSerializerConfigSnapshot
            + * @see ForwardCompatibleSerializationFormatConfig
            + *
            + * @return snapshot of the serializer's current configuration.
            + */
            + public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
            +
            + /**
            + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding
            + * serializer that was registered for serialization of the same managed state (if any - this method is only
            + * relevant if this serializer is registered for serialization of managed state).
            + *
            + * <p>Implementations need to return the resolved migration strategy. The strategy can be one of the following:
            + * <ul>
            + * <li>

            {@link MigrationStrategy#noMigration()}

            : this signals Flink that this serializer is compatible, or
            + * has been reconfigured to be compatible, to continue reading old data, and that the
            + * serialization schema remains the same. No migration needs to be performed.</li>
            + *
            + * <li>

            {@link MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)}

            : this signals Flink that
            + * migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be
            + * compatible, for old data. Furthermore, in the case that the preceding serializer cannot be found or
            + * restored to read the old data, the provided fallback deserializer can be used.</li>
            + *
            + * <li>

            {@link MigrationStrategy#migrate()}: this signals Flink that migration needs to be performed, because
            + * this serializer is not compatible, or cannot be reconfigured to be compatible, for old data.</li>
            + * </ul>
            + *
            + * <p>This method is guaranteed to only be invoked if the preceding serializer's configuration snapshot is not the
            + * singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration. In such cases, Flink always
            + * assume that the migration strategy is {@link MigrationStrategy#migrate()}

            .
            + *
            + * @see MigrationStrategy
            + *
            + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state
            + *
            + * @return the result of the reconfiguration.
            + */
            + protected abstract MigrationStrategy<T> getMigrationStrategy(TypeSerializerConfigSnapshot configSnapshot);

              • End diff –

          Agreed. Refined the naming of this with 81dd0eb

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3834#discussion_r115141585 — Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java — @@ -161,7 +162,93 @@ public abstract int hashCode(); public boolean canRestoreFrom(TypeSerializer<?> other) { return equals(other); + // -------------------------------------------------------------------------------------------- + // Serializer configuration snapshotting & reconfiguring + // -------------------------------------------------------------------------------------------- + + /** + * Create a snapshot of the serializer's current configuration to be stored along with the managed state it is + * registered to (if any - this method is only relevant if this serializer is registered for serialization of + * managed state). + * + * <p>The configuration snapshot should contain information about the serializer's parameter settings and its + * serialization format. When a new serializer is registered to serialize the same managed state that this + * serializer was registered to, the returned configuration snapshot can be used to check with the new serializer + * if any data migration needs to take place. + * + * <p>Implementations can also return the singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} + * configuration if they guarantee forwards compatibility. For example, implementations that use serialization + * frameworks with built-in serialization compatibility, such as <a href= https://thrift.apache.org/ >Thrift</a> or + * <a href= https://developers.google.com/protocol-buffers/ >Protobuf</a>, is suitable for this usage pattern. By + * returning the {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} , this informs Flink that when managed + * state serialized using this serializer is restored, there is no need to check for migration with the new + * serializer for the same state. In other words, new serializers are always assumed to be fully compatible for the + * serialized state. + * + * @see TypeSerializerConfigSnapshot + * @see ForwardCompatibleSerializationFormatConfig + * + * @return snapshot of the serializer's current configuration. + */ + public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); + + /** + * Get the migration strategy to use this serializer based on the configuration snapshot of a preceding + * serializer that was registered for serialization of the same managed state (if any - this method is only + * relevant if this serializer is registered for serialization of managed state). + * + * <p>Implementations need to return the resolved migration strategy. The strategy can be one of the following: + * <ul> + * <li> {@link MigrationStrategy#noMigration()} : this signals Flink that this serializer is compatible, or + * has been reconfigured to be compatible, to continue reading old data, and that the + * serialization schema remains the same. No migration needs to be performed.</li> + * + * <li> {@link MigrationStrategy#migrateWithFallbackDeserializer(TypeSerializer)} : this signals Flink that + * migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be + * compatible, for old data. Furthermore, in the case that the preceding serializer cannot be found or + * restored to read the old data, the provided fallback deserializer can be used.</li> + * + * <li> {@link MigrationStrategy#migrate()}: this signals Flink that migration needs to be performed, because + * this serializer is not compatible, or cannot be reconfigured to be compatible, for old data.</li> + * </ul> + * + * <p>This method is guaranteed to only be invoked if the preceding serializer's configuration snapshot is not the + * singleton {@link ForwardCompatibleSerializationFormatConfig#INSTANCE} configuration. In such cases, Flink always + * assume that the migration strategy is {@link MigrationStrategy#migrate()} . + * + * @see MigrationStrategy + * + * @param configSnapshot configuration snapshot of a preceding serializer for the same managed state + * + * @return the result of the reconfiguration. + */ + protected abstract MigrationStrategy<T> getMigrationStrategy(TypeSerializerConfigSnapshot configSnapshot); End diff – Agreed. Refined the naming of this with 81dd0eb
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3834

          I was already familiar with the high level design of this PR from my discussions with @tzulitai. As the feature freeze is already tomorrow, I had to focus my review on the functionality of the design (i.e. if all cases in backwards compatibility that we want to cover are possible and future plans like state transformation can base upon this work). I also had a look into a couple of important tests, but could not go through all the implementation details for now.

          Overall, I think this is very good work and improves a ton of things w.r.t. everything that has to do with serialization format updates and backwards compatibility. I suggest that we proceed to merge this, so that the code makes it into the release. We can still do potential refinements and fixes in the QA phase, in case they are needed.

          @tzulitai if you agree, please feel free to merge this. +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3834 I was already familiar with the high level design of this PR from my discussions with @tzulitai. As the feature freeze is already tomorrow, I had to focus my review on the functionality of the design (i.e. if all cases in backwards compatibility that we want to cover are possible and future plans like state transformation can base upon this work). I also had a look into a couple of important tests, but could not go through all the implementation details for now. Overall, I think this is very good work and improves a ton of things w.r.t. everything that has to do with serialization format updates and backwards compatibility. I suggest that we proceed to merge this, so that the code makes it into the release. We can still do potential refinements and fixes in the QA phase, in case they are needed. @tzulitai if you agree, please feel free to merge this. +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3834

          Thanks a lot @StefanRRichter for the review, especially on the weekends

          I'll proceed to merge this after addressing also the comment on the priority flow between the serialized old serializer and the convert serializer.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3834 Thanks a lot @StefanRRichter for the review, especially on the weekends I'll proceed to merge this after addressing also the comment on the priority flow between the serialized old serializer and the convert serializer.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3834

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3834
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Merged for 1.3.0 with https://git-wip-us.apache.org/repos/asf/flink/commit/63c04a5

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              tzulitai Tzu-Li (Gordon) Tai
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development