Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6804

Inconsistent state migration behaviour between different state backends

    Details

      Description

      The MemoryStateBackend, FsStateBackend and RocksDBStateBackend show a different behaviour when it comes to recovery from old state and state migration. For example, using the MemoryStateBackend it is possible to recover pojos which now have additional fields (at recovery time). The only caveat is that the recovered PojoSerializer will silently drop the added fields when writing the new Pojo. In contrast, the RocksDBStateBackend correctly recognizes that a state migration is necessary and thus fails with a StateMigrationException. The same applies to the case where Pojo field types change. The MemoryStateBackend and the FsStateBackend accept such a change as long as the fields still have the same length. The RocksDBStateBackend correctly fails with a StateMigrationException.

      I think that all state backends should behave similarly and give the user the same recovery and state migration guarantees. Otherwise, it could happen that jobs run with one state backend but not with another (wrt semantic behaviour).

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/4073

          FLINK-6804 [state] Consistent state migration behaviour across state backends

          This PR is based on #4044, which @tillrohrmann added the ITCases for upgrading POJO types w.r.t. state migration.

          This PR collects several more follow-ups that eventually reaches one goal: unify the state migration behaviours across all state backends to be consistent.

          The extra commits added onto #4044 are as follows:

          • f568252 and 02a360d: fixes failing tests due to changes in #4044. Also, enhances the ITCases of #4044 to include equivalent tests for registering POJOs as operator state (also disabled because they do not pass).
          • ba00f8e and dd81295: fixes the `PojoSerializer` of the issues that caused the tests to not pass. The deserialization of `PojoSerializer` and `PojoSerializerConfigSnapshot` is now resilient to missing fields.
          • 36d87a0: adds compatibility check code paths for `DefaultOperatorStateBackend` and `HeapKeyedStateBackend`. Note that these checks are actually not required, since for the memory backends, all state is read to objects on restore and the job can always just use the new serializer to continue. The additional checks are to make the behaviour of state migration consistent across all backends.
          • 0a045b3: Fully enables the ITCases in #4044 and 02a360d to arm against the fixes.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink pojoserializer-fixes

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4073.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4073


          commit 5d825c1b560c56e1a8f137b8c939e6ded16c5505
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-31T13:14:11Z

          FLINK-6796 [tests] Use Environment's class loader in AbstractStreamOperatorTestHarness

          Generalize KeyedOneInputStreamOperatorTestHarness

          Generalize AbstractStreamOperatorTestHarness

          commit 30b43bf81a45131ddf5137d33d47265cc69713f8
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-31T16:37:12Z

          FLINK-6803 [tests] Add test for PojoSerializer state upgrade

          The added PojoSerializerUpgradeTest tests the state migration behaviour when the
          underlying pojo type changes and one tries to recover from old state. Currently
          not all tests could be activated, because there still some pending issues to be
          fixed first. We should arm these tests once the issues have been fixed.

          commit f568252169ecbf07b76a227a596bb148804b6741
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-06-04T13:51:12Z

          [hotfix] [tests] Fix failing tests in AsyncWaitOperatorTest and StateBackendTestBase

          commit 02a360d98b6dc56a1d9a505411328ba405c78999
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-06-04T17:32:53Z

          FLINK-6803 [tests] Enhancements to PojoSerializerUpgradeTest

          1. Allow tests to ignore missing fields.
          2. Add equivalent tests which use POJOs as managed operator state.

          For 2, all tests have to be ignored for now until FLINK-6804 is fixed.

          commit ba00f8e34b4bb0651e63b84d13aaac3b5b3d3faa
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-06-04T10:30:58Z

          FLINK-6801 [core] Relax missing fields check when reading PojoSerializerConfigSnapshot

          Prior to this commit, when reading the PojoSerializerConfigSnapshot, if
          the underlying POJO type has a missing field, then the read would fail.
          Failing the deserialization of the config snapshot is too severe,
          because that would leave no oppurtunity to restore the checkpoint at
          all, whereas we should be able to restore the config and provide it to
          the new PojoSerializer for the change of getting a convert deserializer.

          This commit changes this by only restoring the field names when reading
          the PojoSerializerConfigSnapshot. In PojoSerializer.ensureCompatibility,
          the field name is used to lookup the fields of the new PojoSerializer.
          This change does not change the serialization format of the
          PojoSerializerConfigSnapshot.

          commit dd8129569de749b8173b7fa3de5eee0c46b9ebe4
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-06-04T18:41:59Z

          FLINK-6801 [core] Allow deserialized PojoSerializer to have removed fields

          Prior to this commit, deserializing the PojoSerializer would fail when
          we encounter a missing field that existed in the POJO type before. It is
          actually perfectly fine to have a missing field; the deserialized
          PojoSerializer should simply skip reading the removed field's previously
          serialized values, i.e. much like how Java Object Serialization works.

          This commit relaxes the deserialization of the PojoSerializer, so that a
          null will be used as a placeholder value to indicate a removed field
          that previously existed. De-/serialization and copying methods on the
          PojoSerializer will respect null Fields and simply skip them.

          commit 36d87a06c08bc4df47a680c4028145e4eac09155
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-06-04T20:40:26Z

          FLINK-6804 [state] Consistent state migration behaviour across state backends

          Prior to this commit, memory and non-memory state backends behaved
          differently w.r.t. state migration. For the memory backends, we did
          not require the new serializer to be compatible in order for the job to
          proceed after restore, because all state have already been deserialized
          to objects and the new serializer can always just be used as is.
          Therefore, the compatibility checks were not performed for the memory
          backends, resulting in different code paths between the different state
          backends.

          However, this inconsistent behaviour across backends will be confusing
          for users. This commit adds the code path to check the newly registered
          serializer's compatibility in the memory backends (even though it isn't
          required), and deliberately fails the job if the new serializer is
          incompatible.

          Note that the compatibiilty code paths will be truly unified and
          required for all backends once we have eager state registration.

          commit 0a045b335a9d424f1d0f566bdcc9ba64695833d1
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-06-04T20:48:45Z

          FLINK-6803 [tests] Fully enable PojoSerializerUpgradeTests for all state backends

          With the fixes for the PojoSerializer in, this commit fully enables all
          tests for upgrading the PojoSerializer for all state backends, which
          otherwise could not pass before.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4073 FLINK-6804 [state] Consistent state migration behaviour across state backends This PR is based on #4044, which @tillrohrmann added the ITCases for upgrading POJO types w.r.t. state migration. This PR collects several more follow-ups that eventually reaches one goal: unify the state migration behaviours across all state backends to be consistent. The extra commits added onto #4044 are as follows: f568252 and 02a360d: fixes failing tests due to changes in #4044. Also, enhances the ITCases of #4044 to include equivalent tests for registering POJOs as operator state (also disabled because they do not pass). ba00f8e and dd81295: fixes the `PojoSerializer` of the issues that caused the tests to not pass. The deserialization of `PojoSerializer` and `PojoSerializerConfigSnapshot` is now resilient to missing fields. 36d87a0: adds compatibility check code paths for `DefaultOperatorStateBackend` and `HeapKeyedStateBackend`. Note that these checks are actually not required, since for the memory backends, all state is read to objects on restore and the job can always just use the new serializer to continue. The additional checks are to make the behaviour of state migration consistent across all backends. 0a045b3: Fully enables the ITCases in #4044 and 02a360d to arm against the fixes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink pojoserializer-fixes Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4073.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4073 commit 5d825c1b560c56e1a8f137b8c939e6ded16c5505 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-31T13:14:11Z FLINK-6796 [tests] Use Environment's class loader in AbstractStreamOperatorTestHarness Generalize KeyedOneInputStreamOperatorTestHarness Generalize AbstractStreamOperatorTestHarness commit 30b43bf81a45131ddf5137d33d47265cc69713f8 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-31T16:37:12Z FLINK-6803 [tests] Add test for PojoSerializer state upgrade The added PojoSerializerUpgradeTest tests the state migration behaviour when the underlying pojo type changes and one tries to recover from old state. Currently not all tests could be activated, because there still some pending issues to be fixed first. We should arm these tests once the issues have been fixed. commit f568252169ecbf07b76a227a596bb148804b6741 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-06-04T13:51:12Z [hotfix] [tests] Fix failing tests in AsyncWaitOperatorTest and StateBackendTestBase commit 02a360d98b6dc56a1d9a505411328ba405c78999 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-06-04T17:32:53Z FLINK-6803 [tests] Enhancements to PojoSerializerUpgradeTest 1. Allow tests to ignore missing fields. 2. Add equivalent tests which use POJOs as managed operator state. For 2, all tests have to be ignored for now until FLINK-6804 is fixed. commit ba00f8e34b4bb0651e63b84d13aaac3b5b3d3faa Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-06-04T10:30:58Z FLINK-6801 [core] Relax missing fields check when reading PojoSerializerConfigSnapshot Prior to this commit, when reading the PojoSerializerConfigSnapshot, if the underlying POJO type has a missing field, then the read would fail. Failing the deserialization of the config snapshot is too severe, because that would leave no oppurtunity to restore the checkpoint at all, whereas we should be able to restore the config and provide it to the new PojoSerializer for the change of getting a convert deserializer. This commit changes this by only restoring the field names when reading the PojoSerializerConfigSnapshot. In PojoSerializer.ensureCompatibility, the field name is used to lookup the fields of the new PojoSerializer. This change does not change the serialization format of the PojoSerializerConfigSnapshot. commit dd8129569de749b8173b7fa3de5eee0c46b9ebe4 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-06-04T18:41:59Z FLINK-6801 [core] Allow deserialized PojoSerializer to have removed fields Prior to this commit, deserializing the PojoSerializer would fail when we encounter a missing field that existed in the POJO type before. It is actually perfectly fine to have a missing field; the deserialized PojoSerializer should simply skip reading the removed field's previously serialized values, i.e. much like how Java Object Serialization works. This commit relaxes the deserialization of the PojoSerializer, so that a null will be used as a placeholder value to indicate a removed field that previously existed. De-/serialization and copying methods on the PojoSerializer will respect null Fields and simply skip them. commit 36d87a06c08bc4df47a680c4028145e4eac09155 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-06-04T20:40:26Z FLINK-6804 [state] Consistent state migration behaviour across state backends Prior to this commit, memory and non-memory state backends behaved differently w.r.t. state migration. For the memory backends, we did not require the new serializer to be compatible in order for the job to proceed after restore, because all state have already been deserialized to objects and the new serializer can always just be used as is. Therefore, the compatibility checks were not performed for the memory backends, resulting in different code paths between the different state backends. However, this inconsistent behaviour across backends will be confusing for users. This commit adds the code path to check the newly registered serializer's compatibility in the memory backends (even though it isn't required), and deliberately fails the job if the new serializer is incompatible. Note that the compatibiilty code paths will be truly unified and required for all backends once we have eager state registration. commit 0a045b335a9d424f1d0f566bdcc9ba64695833d1 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-06-04T20:48:45Z FLINK-6803 [tests] Fully enable PojoSerializerUpgradeTests for all state backends With the fixes for the PojoSerializer in, this commit fully enables all tests for upgrading the PojoSerializer for all state backends, which otherwise could not pass before.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4073

          Some CEP migration tests are failing due to FLINK-6853. Rebasing this PR to include the fix for that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4073 Some CEP migration tests are failing due to FLINK-6853 . Rebasing this PR to include the fix for that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4073

          (the rebased commits appear to be displayed in the wrong order; Github uses chronological ordering after rebases. Please see description for a better overview of the changes).

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4073 (the rebased commits appear to be displayed in the wrong order; Github uses chronological ordering after rebases. Please see description for a better overview of the changes).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/4073

          Very good work. I think the properly addresses the mentioned issues. +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4073 Very good work. I think the properly addresses the mentioned issues. +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4073

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4073
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for 1.31. via 379be13b67948d28be66e071072412c870d6e1f8.
          Fixed for master via f0f2e99b6c829c4f4e2ca47c7647a64fe0c9d808.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for 1.31. via 379be13b67948d28be66e071072412c870d6e1f8. Fixed for master via f0f2e99b6c829c4f4e2ca47c7647a64fe0c9d808.

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development