Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6803

Add test for PojoSerializer when Pojo changes

    Details

      Description

      We should add test cases for the PojoSerializer when the underlying Pojo type changes in order to test the proper behaviour of the serializer.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/4044

          FLINK-6803 [tests] Add test for PojoSerializer state upgrade

          The added PojoSerializerUpgradeTest tests the state migration behaviour when the
          underlying pojo type changes and one tries to recover from old state. Currently
          not all tests could be activated, because there still some pending issues to be
          fixed first. We should arm these tests once the issues have been fixed.

          cc @tzulitai

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink addTypeSerializerTests

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4044.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4044


          commit e2b2fc2ba6ec4a20ff04d47491c5d135e056b8e9
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-31T13:14:11Z

          FLINK-6796 [tests] Use Environment's class loader in AbstractStreamOperatorTestHarness

          Generalize KeyedOneInputStreamOperatorTestHarness

          Generalize AbstractStreamOperatorTestHarness

          commit 8ccaba6f6dede0f801608d7fafea5904e81b1624
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-31T16:37:12Z

          FLINK-6803 [tests] Add test for PojoSerializer state upgrade

          The added PojoSerializerUpgradeTest tests the state migration behaviour when the
          underlying pojo type changes and one tries to recover from old state. Currently
          not all tests could be activated, because there still some pending issues to be
          fixed first. We should arm these tests once the issues have been fixed.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4044 FLINK-6803 [tests] Add test for PojoSerializer state upgrade The added PojoSerializerUpgradeTest tests the state migration behaviour when the underlying pojo type changes and one tries to recover from old state. Currently not all tests could be activated, because there still some pending issues to be fixed first. We should arm these tests once the issues have been fixed. cc @tzulitai You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink addTypeSerializerTests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4044.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4044 commit e2b2fc2ba6ec4a20ff04d47491c5d135e056b8e9 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-31T13:14:11Z FLINK-6796 [tests] Use Environment's class loader in AbstractStreamOperatorTestHarness Generalize KeyedOneInputStreamOperatorTestHarness Generalize AbstractStreamOperatorTestHarness commit 8ccaba6f6dede0f801608d7fafea5904e81b1624 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-31T16:37:12Z FLINK-6803 [tests] Add test for PojoSerializer state upgrade The added PojoSerializerUpgradeTest tests the state migration behaviour when the underlying pojo type changes and one tries to recover from old state. Currently not all tests could be activated, because there still some pending issues to be fixed first. We should arm these tests once the issues have been fixed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4044#discussion_r120007888

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java —
          @@ -192,7 +187,7 @@ public StreamStatus getStreamStatus() {
          when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
          when(mockTask.getEnvironment()).thenReturn(environment);
          when(mockTask.getExecutionConfig()).thenReturn(executionConfig);

          • when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
            + when(mockTask.getUserCodeClassLoader()).thenReturn(environment.getUserClassLoader());
              • End diff –

          Some tests are failing because of this change.

          I think the problem is because the given environment may also be a mock whose stubbing isn't completed yet, leading to a `org.mockito.exceptions.misusing.UnfinishedStubbingException`.

          We can avoid that by doing this:
          ```
          ClassLoader cl = environment.getUserClassLoader();
          when(mockTask.getUserCodeClassLoader()).thenReturn(cl);
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4044#discussion_r120007888 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java — @@ -192,7 +187,7 @@ public StreamStatus getStreamStatus() { when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig); when(mockTask.getEnvironment()).thenReturn(environment); when(mockTask.getExecutionConfig()).thenReturn(executionConfig); when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader()); + when(mockTask.getUserCodeClassLoader()).thenReturn(environment.getUserClassLoader()); End diff – Some tests are failing because of this change. I think the problem is because the given environment may also be a mock whose stubbing isn't completed yet, leading to a `org.mockito.exceptions.misusing.UnfinishedStubbingException`. We can avoid that by doing this: ``` ClassLoader cl = environment.getUserClassLoader(); when(mockTask.getUserCodeClassLoader()).thenReturn(cl); ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4044#discussion_r120008474

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java —
          @@ -1804,53 +1805,44 @@ public void testKeyGroupSnapshotRestore() throws Exception {
          }

          @Test

          • public void testRestoreWithWrongKeySerializer() {
          • try {
          • CheckpointStreamFactory streamFactory = createStreamFactory();
            + public void testRestoreWithWrongKeySerializer() throws Exception {
            + CheckpointStreamFactory streamFactory = createStreamFactory();
          • // use an IntSerializer at first
          • AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
            + // use an IntSerializer at first
            + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
          • ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
            + ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
          • ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
            + ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
          • // write some state
          • backend.setCurrentKey(1);
          • state.update("1");
          • backend.setCurrentKey(2);
          • state.update("2");
            + // write some state
            + backend.setCurrentKey(1);
            + state.update("1");
            + backend.setCurrentKey(2);
            + state.update("2");
          • // draw a snapshot
          • KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
            + // draw a snapshot
            + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
          • backend.dispose();
            + backend.dispose();
          • // restore with the wrong key serializer
          • try {
          • restoreKeyedBackend(DoubleSerializer.INSTANCE, snapshot1);
            + // restore with the wrong key serializer
            + try { + restoreKeyedBackend(DoubleSerializer.INSTANCE, snapshot1); - fail("should recognize wrong key serializer"); - }

            catch (RuntimeException e) {

          • if (!e.getMessage().contains("The new key serializer is not compatible")) { - fail("wrong exception " + e); - }
          • // expected
          • }
          • }
          • catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + fail("should recognize wrong key serializer"); + }

            catch (StateMigrationException ignored) {

              • End diff –

          This change is failing because the `RocksDBKeyedStateBackend` is not throwing the new exception when checking key serializers.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4044#discussion_r120008474 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java — @@ -1804,53 +1805,44 @@ public void testKeyGroupSnapshotRestore() throws Exception { } @Test public void testRestoreWithWrongKeySerializer() { try { CheckpointStreamFactory streamFactory = createStreamFactory(); + public void testRestoreWithWrongKeySerializer() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); // use an IntSerializer at first AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + // use an IntSerializer at first + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class); + ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class); ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); // write some state backend.setCurrentKey(1); state.update("1"); backend.setCurrentKey(2); state.update("2"); + // write some state + backend.setCurrentKey(1); + state.update("1"); + backend.setCurrentKey(2); + state.update("2"); // draw a snapshot KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + // draw a snapshot + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); backend.dispose(); + backend.dispose(); // restore with the wrong key serializer try { restoreKeyedBackend(DoubleSerializer.INSTANCE, snapshot1); + // restore with the wrong key serializer + try { + restoreKeyedBackend(DoubleSerializer.INSTANCE, snapshot1); - fail("should recognize wrong key serializer"); - } catch (RuntimeException e) { if (!e.getMessage().contains("The new key serializer is not compatible")) { - fail("wrong exception " + e); - } // expected } } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + fail("should recognize wrong key serializer"); + } catch (StateMigrationException ignored) { End diff – This change is failing because the `RocksDBKeyedStateBackend` is not throwing the new exception when checking key serializers.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4044

          I'll merge this (will address my own comments) together with the other pending `PojoSerializer` fixes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4044 I'll merge this (will address my own comments) together with the other pending `PojoSerializer` fixes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/4044

          Thanks for the review and that you merge the PR @tzulitai.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4044 Thanks for the review and that you merge the PR @tzulitai.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4044

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4044
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for 1.3.1 via 1bdd19d044d18e81b4d2c4016ff38e6d86c6f609.
          Fixed for master via 8b26460b72bc4322aea7a9feaa3a728646c0399a.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for 1.3.1 via 1bdd19d044d18e81b4d2c4016ff38e6d86c6f609. Fixed for master via 8b26460b72bc4322aea7a9feaa3a728646c0399a.

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development