Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6018

Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()

    Details

      Description

      The code snippet currently in the `AbstractKeyedStateBackend # getPartitionedState` method, as follows:

      line 352: // TODO: This is wrong, it should throw an exception that the initialization has not properly happened
      line 353: if (!stateDescriptor.isSerializerInitialized()) {
      line 354:        stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
      line 354 }
      

      Method `isSerializerInitialized`:

      public boolean isSerializerInitialized() {
      		return serializer != null;
      	}
      

      Method `initializeSerializerUnlessSet`:

      public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
      		if (serializer == null) { 
      			if (typeInfo != null) {
      				serializer = typeInfo.createSerializer(executionConfig);
      			} else {
      				throw new IllegalStateException(
      						"Cannot initialize serializer after TypeInformation was dropped during serialization");
      			}
      		}
      	}
      

      that is, in the `initializeSerializerUnlessSet` method, The `serializer` has been checked by `serializer == null`.So I hope this code has a little improvement to the following:
      approach 1:
      According to the `TODO` information we throw an exception

      if (!stateDescriptor.isSerializerInitialized()) {
      			throw new IllegalStateException("The serializer of the descriptor has not been initialized!"); 
      }
      

      approach 2:
      Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized())

      {` logic. {code}

      stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());

      Meanwhile, If we use the approach 2, I suggest that `AbstractKeyedStateBackend` add a `private final ExecutionConfig executionConfig` property. then we can change the code like this:
      

      stateDescriptor.initializeSerializerUnlessSet(executionConfig);

      
      

      Are the above suggestions reasonable for you?
      Welcome anybody's feedback and corrections.

        Issue Links

          Activity

          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -
          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Resolved for 1.3.0 with http://git-wip-us.apache.org/repos/asf/flink/commit/09164cf Thanks for the fixes sunjincheng and Aljoscha Krettek !
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3603

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3603
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3534

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3534
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3603

          There are some failing tests, but unrelated to the changes. Merging this!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3603 There are some failing tests, but unrelated to the changes. Merging this!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3603

          Thanks for the review!
          I'll wait for local Travis to gives its approval and then merge this : https://travis-ci.org/tzulitai/flink/builds/214276246

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3603 Thanks for the review! I'll wait for local Travis to gives its approval and then merge this : https://travis-ci.org/tzulitai/flink/builds/214276246
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3603

          your fixes (and especially comments) look good. 👍

          I think we can merge this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3603 your fixes (and especially comments) look good. 👍 I think we can merge this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3603#discussion_r107692914

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java —
          @@ -368,18 +388,29 @@ public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exc
          }

          try

          { + // backends that lazily serializes (such as memory state backend) will fail here runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); }

          catch (ExpectedKryoTestException e)

          { numExceptions++; + }

          catch (Exception e) {
          + if (e.getCause() instanceof ExpectedKryoTestException)

          { + numExceptions++; + }

          else

          { + throw e; + }

          }

          assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
          }

          /**

          • * Verify that we can restore a snapshot that was done with without registered types
          • * after registering types.
            + * Verify state restore resilience when:
            + * - snapshot was taken without any Kryo registrations, specific serializers or default serializers for the state type
            + * - restored with the state type registered (no specific serializer)
            + *
            + * This test should not fail, because de- / serialization of the state should noth be performed with Kryo's default
              • End diff –

          Typo: noth

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3603#discussion_r107692914 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java — @@ -368,18 +388,29 @@ public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exc } try { + // backends that lazily serializes (such as memory state backend) will fail here runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); } catch (ExpectedKryoTestException e) { numExceptions++; + } catch (Exception e) { + if (e.getCause() instanceof ExpectedKryoTestException) { + numExceptions++; + } else { + throw e; + } } assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions); } /** * Verify that we can restore a snapshot that was done with without registered types * after registering types. + * Verify state restore resilience when: + * - snapshot was taken without any Kryo registrations, specific serializers or default serializers for the state type + * - restored with the state type registered (no specific serializer) + * + * This test should not fail, because de- / serialization of the state should noth be performed with Kryo's default End diff – Typo: noth
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha closed the pull request at:

          https://github.com/apache/flink/pull/3562

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/3562
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3562

          Closing in favour of #3603

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3562 Closing in favour of #3603
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3562

          Done. Re-opened at #3603.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3562 Done. Re-opened at #3603.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/3603

          FLINK-6018 Add tests for KryoSerializer restore with registered types

          (This PR is a re-opened version of #3562, including @aljoscha's initial work and my follow-up fixes. The below description is directly copied from #3562.)

          I changed `TypeSerializer.isCompatibleWith()` to `TypeSerializer.canRestoreFrom` because the relation is not necessarily symmetric.

          I added a `KryoSerializer.canRestoreFrom()` that only allows restoring when we previously didn't have registered types/serializers.

          I added a whole bunch of tests in `StateBackendTestBase`, this should be reviewed most thoroughly.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-6018

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3603.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3603



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/3603 FLINK-6018 Add tests for KryoSerializer restore with registered types (This PR is a re-opened version of #3562, including @aljoscha's initial work and my follow-up fixes. The below description is directly copied from #3562.) I changed `TypeSerializer.isCompatibleWith()` to `TypeSerializer.canRestoreFrom` because the relation is not necessarily symmetric. I added a `KryoSerializer.canRestoreFrom()` that only allows restoring when we previously didn't have registered types/serializers. I added a whole bunch of tests in `StateBackendTestBase`, this should be reviewed most thoroughly. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6018 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3603.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3603
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3562

          @aljoscha sure, I'll do that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3562 @aljoscha sure, I'll do that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3562

          @tzulitai I would suggest you open a PR with two commits: mine and your changes on top. What do you think? Then I would close this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3562 @tzulitai I would suggest you open a PR with two commits: mine and your changes on top. What do you think? Then I would close this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3562

          Here's my follow-up fixes for this: https://github.com/tzulitai/flink/tree/FLINK-6018

          It also includes a rebase for the merge conflict + test fixes.
          We can continue with this after Aljoscha comes back from vacation

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3562 Here's my follow-up fixes for this: https://github.com/tzulitai/flink/tree/FLINK-6018 It also includes a rebase for the merge conflict + test fixes. We can continue with this after Aljoscha comes back from vacation
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Changed to blocker because we either need the fix from PR https://github.com/apache/flink/pull/3562. Or revert what we already changed.

          Show
          aljoscha Aljoscha Krettek added a comment - Changed to blocker because we either need the fix from PR https://github.com/apache/flink/pull/3562 . Or revert what we already changed.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          This needs a fix to KryoSerializer to properly work when restoring.

          The fix is in this PR: https://github.com/apache/flink/pull/3562

          Show
          aljoscha Aljoscha Krettek added a comment - This needs a fix to KryoSerializer to properly work when restoring. The fix is in this PR: https://github.com/apache/flink/pull/3562
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3562

          @aljoscha I've been trying out this PR to also test some Kryo behaviours. I can address the above comments along the way and open a PR against this one

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3562 @aljoscha I've been trying out this PR to also test some Kryo behaviours. I can address the above comments along the way and open a PR against this one
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3562#discussion_r106854612

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java —
          @@ -162,22 +176,422 @@ protected CheckpointStreamFactory createStreamFactory() throws Exception {
          }

          @Test
          + public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception {
          + CheckpointStreamFactory streamFactory = createStreamFactory();
          + Environment env = new DummyEnvironment("test", 1, 0);
          + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
          +
          + // cast because our test serializer is not typed to TestPojo
          + env.getExecutionConfig()
          + .addDefaultKryoSerializer(TestPojo.class, (Class) ExceptionThrowingTestSerializer.class);
          +
          + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
          +
          + // make sure that we are in fact using the KryoSerializer
          + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
          +
          + pojoType.createSerializer(env.getExecutionConfig());
          — End diff –

          Must have missed that when I cleaned up the code after experimenting ... 😓 Fixing.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3562#discussion_r106854612 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java — @@ -162,22 +176,422 @@ protected CheckpointStreamFactory createStreamFactory() throws Exception { } @Test + public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment("test", 1, 0); + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + // cast because our test serializer is not typed to TestPojo + env.getExecutionConfig() + .addDefaultKryoSerializer(TestPojo.class, (Class) ExceptionThrowingTestSerializer.class); + + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + + pojoType.createSerializer(env.getExecutionConfig()); — End diff – Must have missed that when I cleaned up the code after experimenting ... 😓 Fixing.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3562#discussion_r106776658

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java —
          @@ -162,22 +176,422 @@ protected CheckpointStreamFactory createStreamFactory() throws Exception {
          }

          @Test
          + public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception {
          + CheckpointStreamFactory streamFactory = createStreamFactory();
          + Environment env = new DummyEnvironment("test", 1, 0);
          + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
          +
          + // cast because our test serializer is not typed to TestPojo
          + env.getExecutionConfig()
          + .addDefaultKryoSerializer(TestPojo.class, (Class) ExceptionThrowingTestSerializer.class);
          +
          + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
          +
          + // make sure that we are in fact using the KryoSerializer
          + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
          +
          + pojoType.createSerializer(env.getExecutionConfig());
          +
          + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
          +
          + ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
          +
          + // backends that eagerly serializer will fail when updating state, others will
          + // fail only when performing the snapshot
          + int numExceptions = 0;
          +
          + backend.setCurrentKey(1);
          +
          + try

          { + state.update(new TestPojo("u1", 1)); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } catch (RuntimeException e) {
          + if (e.getCause() instanceof ExpectedKryoTestException) { + numExceptions++; + } else { + throw e; + }
          + }
          +
          + try { + runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + } catch (ExpectedKryoTestException e) { + numExceptions++; + }
          +
          + assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
          + }
          +
          + @Test
          + public void testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws Exception {
          + CheckpointStreamFactory streamFactory = createStreamFactory();
          + Environment env = new DummyEnvironment("test", 1, 0);
          + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
          +
          + // cast because our test serializer is not typed to TestPojo
          + env.getExecutionConfig()
          + .addDefaultKryoSerializer(TestPojo.class, (Class) ExceptionThrowingTestSerializer.class);
          +
          + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
          +
          + // make sure that we are in fact using the KryoSerializer
          + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
          +
          + pojoType.createSerializer(env.getExecutionConfig());
          +
          + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
          +
          + ValueState<TestPojo> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
          + assert state instanceof InternalValueState;
          + ((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE);
          +
          + // backends that eagerly serializer will fail when updating state, others will
          + // fail only when performing the snapshot
          + int numExceptions = 0;
          +
          + backend.setCurrentKey(1);
          +
          + try { + state.update(new TestPojo("u1", 1)); + }

          catch (ExpectedKryoTestException e)

          { + numExceptions++; + } catch (RuntimeException e) {
          + if (e.getCause() instanceof ExpectedKryoTestException) { + numExceptions++; + } else { + throw e; + }
          + }
          +
          + try { + runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + } catch (ExpectedKryoTestException e) { + numExceptions++; + }

          +
          + assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
          + }
          +
          + @Test
          + public void testBackendUsesRegisteredKryoSerializer() throws Exception {
          + CheckpointStreamFactory streamFactory = createStreamFactory();
          + Environment env = new DummyEnvironment("test", 1, 0);
          + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
          +
          + env.getExecutionConfig()
          + .registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
          +
          + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
          +
          + // make sure that we are in fact using the KryoSerializer
          + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
          +
          + pojoType.createSerializer(env.getExecutionConfig());
          +
          + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
          +
          + ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
          +
          + // backends that eagerly serializer will fail when updating state, others will
          + // fail only when performing the snapshot
          + int numExceptions = 0;
          +
          + backend.setCurrentKey(1);
          +
          + try

          { + state.update(new TestPojo("u1", 1)); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } catch (RuntimeException e) {
          + if (e.getCause() instanceof ExpectedKryoTestException) { + numExceptions++; + } else { + throw e; + }
          + }
          +
          + try { + runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + } catch (ExpectedKryoTestException e) { + numExceptions++; + }
          +
          + assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
          + }
          +
          + @Test
          + public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exception {
          + CheckpointStreamFactory streamFactory = createStreamFactory();
          + Environment env = new DummyEnvironment("test", 1, 0);
          + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
          +
          + env.getExecutionConfig()
          + .registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class);
          +
          + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
          +
          + // make sure that we are in fact using the KryoSerializer
          + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
          +
          + pojoType.createSerializer(env.getExecutionConfig());
          +
          + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
          +
          + ValueState<TestPojo> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId);
          + assert state instanceof InternalValueState;
          + ((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE);
          +
          + // backends that eagerly serializer will fail when updating state, others will
          + // fail only when performing the snapshot
          + int numExceptions = 0;
          +
          + backend.setCurrentKey(1);
          +
          + try { + state.update(new TestPojo("u1", 1)); + }

          catch (ExpectedKryoTestException e)

          { + numExceptions++; + } catch (RuntimeException e) {
          + if (e.getCause() instanceof ExpectedKryoTestException) { + numExceptions++; + } else { + throw e; + }
          + }
          +
          + try { + runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + } catch (ExpectedKryoTestException e) { + numExceptions++; + }

          +
          + assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
          + }
          +
          +
          + /**
          + * Verify that we can restore a snapshot that was done with without registered types
          + * after registering types.
          + */
          + @Test
          + public void testKryoRegisteringRestoreResilienceWithRegisteredType() throws Exception

          { + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment("test", 1, 0); + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + + pojoType.createSerializer(env.getExecutionConfig()); + + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); + + ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + // make some more modifications + backend.setCurrentKey(1); + state.update(new TestPojo("u1", 1)); + + backend.setCurrentKey(2); + state.update(new TestPojo("u2", 2)); + + KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot( + 682375462378L, + 2, + streamFactory, + CheckpointOptions.forFullCheckpoint())); + + backend.dispose(); + + env.getExecutionConfig().registerKryoType(TestPojo.class); + + backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); + + snapshot.discardState(); + + state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + backend.setCurrentKey(1); + assertEquals(state.value(), new TestPojo("u1", 1)); + + backend.setCurrentKey(2); + assertEquals(state.value(), new TestPojo("u2", 2)); + + backend.dispose(); + }

          +
          + /**
          + * Verify that we can restore a snapshot that was done with without registered default
          — End diff –

          "with without" redundancy --> "without", I think ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3562#discussion_r106776658 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java — @@ -162,22 +176,422 @@ protected CheckpointStreamFactory createStreamFactory() throws Exception { } @Test + public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment("test", 1, 0); + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + // cast because our test serializer is not typed to TestPojo + env.getExecutionConfig() + .addDefaultKryoSerializer(TestPojo.class, (Class) ExceptionThrowingTestSerializer.class); + + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + + pojoType.createSerializer(env.getExecutionConfig()); + + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); + + ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + // backends that eagerly serializer will fail when updating state, others will + // fail only when performing the snapshot + int numExceptions = 0; + + backend.setCurrentKey(1); + + try { + state.update(new TestPojo("u1", 1)); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } catch (RuntimeException e) { + if (e.getCause() instanceof ExpectedKryoTestException) { + numExceptions++; + } else { + throw e; + } + } + + try { + runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } + + assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions); + } + + @Test + public void testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment("test", 1, 0); + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + // cast because our test serializer is not typed to TestPojo + env.getExecutionConfig() + .addDefaultKryoSerializer(TestPojo.class, (Class) ExceptionThrowingTestSerializer.class); + + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + + pojoType.createSerializer(env.getExecutionConfig()); + + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); + + ValueState<TestPojo> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId); + assert state instanceof InternalValueState; + ((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE); + + // backends that eagerly serializer will fail when updating state, others will + // fail only when performing the snapshot + int numExceptions = 0; + + backend.setCurrentKey(1); + + try { + state.update(new TestPojo("u1", 1)); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } catch (RuntimeException e) { + if (e.getCause() instanceof ExpectedKryoTestException) { + numExceptions++; + } else { + throw e; + } + } + + try { + runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } + + assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions); + } + + @Test + public void testBackendUsesRegisteredKryoSerializer() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment("test", 1, 0); + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + env.getExecutionConfig() + .registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class); + + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + + pojoType.createSerializer(env.getExecutionConfig()); + + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); + + ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + // backends that eagerly serializer will fail when updating state, others will + // fail only when performing the snapshot + int numExceptions = 0; + + backend.setCurrentKey(1); + + try { + state.update(new TestPojo("u1", 1)); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } catch (RuntimeException e) { + if (e.getCause() instanceof ExpectedKryoTestException) { + numExceptions++; + } else { + throw e; + } + } + + try { + runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } + + assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions); + } + + @Test + public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment("test", 1, 0); + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + env.getExecutionConfig() + .registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class); + + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + + pojoType.createSerializer(env.getExecutionConfig()); + + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); + + ValueState<TestPojo> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId); + assert state instanceof InternalValueState; + ((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE); + + // backends that eagerly serializer will fail when updating state, others will + // fail only when performing the snapshot + int numExceptions = 0; + + backend.setCurrentKey(1); + + try { + state.update(new TestPojo("u1", 1)); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } catch (RuntimeException e) { + if (e.getCause() instanceof ExpectedKryoTestException) { + numExceptions++; + } else { + throw e; + } + } + + try { + runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } + + assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions); + } + + + /** + * Verify that we can restore a snapshot that was done with without registered types + * after registering types. + */ + @Test + public void testKryoRegisteringRestoreResilienceWithRegisteredType() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment("test", 1, 0); + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + + pojoType.createSerializer(env.getExecutionConfig()); + + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); + + ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + // make some more modifications + backend.setCurrentKey(1); + state.update(new TestPojo("u1", 1)); + + backend.setCurrentKey(2); + state.update(new TestPojo("u2", 2)); + + KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot( + 682375462378L, + 2, + streamFactory, + CheckpointOptions.forFullCheckpoint())); + + backend.dispose(); + + env.getExecutionConfig().registerKryoType(TestPojo.class); + + backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); + + snapshot.discardState(); + + state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + backend.setCurrentKey(1); + assertEquals(state.value(), new TestPojo("u1", 1)); + + backend.setCurrentKey(2); + assertEquals(state.value(), new TestPojo("u2", 2)); + + backend.dispose(); + } + + /** + * Verify that we can restore a snapshot that was done with without registered default — End diff – "with without" redundancy --> "without", I think ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/3562

          +1 to the rename. I was thinking the same here

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3562 +1 to the rename. I was thinking the same here
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user aljoscha opened a pull request:

          https://github.com/apache/flink/pull/3562

          FLINK-6018 Add tests for KryoSerializer restore with registered types

          This is the result of the discussion in #3534.

          I changed `TypeSerializer.isCompatibleWith()` to `TypeSerializer.canRestoreFrom` because the relation is not necessarily symmetric.

          I added a `KryoSerializer.canRestoreFrom()` that only allows restoring when we previously didn't have registered types/serializers.

          I added a whole bunch of tests in `StateBackendTestBase`, this should be review most thoroughly.

          R: @StephanEwen and @tzulitai because this probably is interesting with the serialiser update story that you're working on.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/aljoscha/flink jira-6018-state-init-fixups

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3562.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3562


          commit f0c3af53d24a3eac914cf1ceb3b1761a40553dfe
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2017-03-16T14:17:05Z

          FLINK-6018 Add tests for KryoSerializer restore with registered types

          commit b90cf5cad5176d8edcbd189a9b65cc4999cddd53
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2017-03-17T09:56:13Z

          FLINK-6018 Rename isCompatibleWith() to canRestoreFrom()

          This make the method asymetric because in the case of KryoSerializer we
          can restore from state that was stored using no registed
          types/serializers while the other way around is not possible.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3562 FLINK-6018 Add tests for KryoSerializer restore with registered types This is the result of the discussion in #3534. I changed `TypeSerializer.isCompatibleWith()` to `TypeSerializer.canRestoreFrom` because the relation is not necessarily symmetric. I added a `KryoSerializer.canRestoreFrom()` that only allows restoring when we previously didn't have registered types/serializers. I added a whole bunch of tests in `StateBackendTestBase`, this should be review most thoroughly. R: @StephanEwen and @tzulitai because this probably is interesting with the serialiser update story that you're working on. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-6018-state-init-fixups Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3562.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3562 commit f0c3af53d24a3eac914cf1ceb3b1761a40553dfe Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-03-16T14:17:05Z FLINK-6018 Add tests for KryoSerializer restore with registered types commit b90cf5cad5176d8edcbd189a9b65cc4999cddd53 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-03-17T09:56:13Z FLINK-6018 Rename isCompatibleWith() to canRestoreFrom() This make the method asymetric because in the case of KryoSerializer we can restore from state that was stored using no registed types/serializers while the other way around is not possible.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3534

          @aljoscha Can you share https://github.com/aljoscha/flink/tree/jira-6018-state-init-fixups with @tzulitai who is looking into serializer upgrade paths?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3534 @aljoscha Can you share https://github.com/aljoscha/flink/tree/jira-6018-state-init-fixups with @tzulitai who is looking into serializer upgrade paths?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3534

          This change is probably uncritical:

          • It seems this code path was never executed (by chance) because the `RuntimeContext` pre-initializes state descriptors, and all the operators directly supply serializers to the descriptors.
          • As @aljoscha mentioned, in the current case or the KryoSerializer, adding more configuration is not a problem.

          Okay, +1, this seems safe.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3534 This change is probably uncritical: It seems this code path was never executed (by chance) because the `RuntimeContext` pre-initializes state descriptors, and all the operators directly supply serializers to the descriptors. As @aljoscha mentioned, in the current case or the KryoSerializer, adding more configuration is not a problem. Okay, +1, this seems safe.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3534

          @StephanEwen you're right, this is problematic. The funny thing is, that this actually works for Kryo. Having a serialiser without any registered types/serializer works when the restoring serialiser does have registered types/serializers. It does probably not work the other way round. I started implementing the fixup, tests here: https://github.com/aljoscha/flink/tree/jira-6018-state-init-fixups

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3534 @StephanEwen you're right, this is problematic. The funny thing is, that this actually works for Kryo. Having a serialiser without any registered types/serializer works when the restoring serialiser does have registered types/serializers. It does probably not work the other way round. I started implementing the fixup, tests here: https://github.com/aljoscha/flink/tree/jira-6018-state-init-fixups
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3534

          Hi, @StephanEwen thanks for your attention to this JIRA. This issue only a code cleanup. In fact this is not a bug. When I work on FLINK-5995(https://issues.apache.org/jira/browse/FLINK-5995), when using `DefaultOperatorStateBackend#getOperatorState` got an exception, Then I checked out the `AbstractKeyedStateBackend#getPartitionedState`, Then find the code duplicate check. Just like description of this JIRA.
          About `AbstractKeyedStateBackend#getOrCreateKeyedState` I'm not sure, but I suggest do the same thing with `getPartitionedState`. Because I had check `WindowOperator's windowStateDescriptor` also use `input.getType().createSerializer(getExecutionEnvironment().getConfig())` init the `typeSerializer`.
          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3534 Hi, @StephanEwen thanks for your attention to this JIRA. This issue only a code cleanup. In fact this is not a bug. When I work on FLINK-5995 ( https://issues.apache.org/jira/browse/FLINK-5995 ), when using `DefaultOperatorStateBackend#getOperatorState` got an exception, Then I checked out the `AbstractKeyedStateBackend#getPartitionedState`, Then find the code duplicate check. Just like description of this JIRA. About `AbstractKeyedStateBackend#getOrCreateKeyedState` I'm not sure, but I suggest do the same thing with `getPartitionedState`. Because I had check `WindowOperator's windowStateDescriptor` also use `input.getType().createSerializer(getExecutionEnvironment().getConfig())` init the `typeSerializer`. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3534

          This change may alter the format of savepoints, because it now forwards type-registrations to the Kryo serializer, which it did not do before.

          Since we announced savepoint compatibility, we need to understand when this would happen. Is it something that could not really happen before (because whenever the method was called, the serializer was properly initialized outside), or did this actually occur in some cases?

          @sunjincheng121 Can you share how you stumbled across this bug? Was it a code cleanup, or a bug you encountered while running Flink?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3534 This change may alter the format of savepoints, because it now forwards type-registrations to the Kryo serializer, which it did not do before. Since we announced savepoint compatibility, we need to understand when this would happen. Is it something that could not really happen before (because whenever the method was called, the serializer was properly initialized outside), or did this actually occur in some cases? @sunjincheng121 Can you share how you stumbled across this bug? Was it a code cleanup, or a bug you encountered while running Flink?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3534

          There is also another method in `AbstractKeyedStateBackend`: `getOrCreateKeyedState` which does not silently use an empty `ExecutionConfig` but throws an exception. That method could also use the same execution config, now that it is available.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3534 There is also another method in `AbstractKeyedStateBackend`: `getOrCreateKeyedState` which does not silently use an empty `ExecutionConfig` but throws an exception. That method could also use the same execution config, now that it is available.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3534

          I think this could have really used a test. It is tricky bug that will easily be re-introduced without a test that guards the fix.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3534 I think this could have really used a test. It is tricky bug that will easily be re-introduced without a test that guards the fix.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3534

          Thanks for your contribution! 😃 I just merged.

          Could you please close this PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3534 Thanks for your contribution! 😃 I just merged. Could you please close this PR?
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Fixed on master in
          264f6df8e0c0fb2f9dfb0cd9beab9d380dc8e00c

          Show
          aljoscha Aljoscha Krettek added a comment - Fixed on master in 264f6df8e0c0fb2f9dfb0cd9beab9d380dc8e00c
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3534

          HI, @aljoscha Thank you for your attention to this JIRA.
          Best,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3534 HI, @aljoscha Thank you for your attention to this JIRA. Best, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3534

          This looks good! I'll wait for travis and then merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3534 This looks good! I'll wait for travis and then merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3534

          HI,guys, Welcome anyone to review this PR.
          Best,
          SunJIncheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3534 HI,guys, Welcome anyone to review this PR. Best, SunJIncheng
          Hide
          sunjincheng121 sunjincheng added a comment -

          Thanks Aljoscha Krettek, I had Open the PR. feel free to tell me If there is anything incorrect.
          Best,
          SunJincheng

          Show
          sunjincheng121 sunjincheng added a comment - Thanks Aljoscha Krettek , I had Open the PR. feel free to tell me If there is anything incorrect. Best, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sunjincheng121 opened a pull request:

          https://github.com/apache/flink/pull/3534

          FLINK-6018[statebackend] Properly initialise StateDescriptor in Abs…

          …tractStateBackend.getPartitionedState()

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [×] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [×] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sunjincheng121/flink FLINK-6018-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3534.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3534


          commit 0119c7238d2057d1acd682a1a3cec815169f0f36
          Author: 金竹 <jincheng.sunjc@alibaba-inc.com>
          Date: 2017-03-14T14:26:53Z

          FLINK-6018[statebackend] Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3534 FLINK-6018 [statebackend] Properly initialise StateDescriptor in Abs… …tractStateBackend.getPartitionedState() Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [×] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [×] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6018 -PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3534.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3534 commit 0119c7238d2057d1acd682a1a3cec815169f0f36 Author: 金竹 <jincheng.sunjc@alibaba-inc.com> Date: 2017-03-14T14:26:53Z FLINK-6018 [statebackend] Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()
          Hide
          aljoscha Aljoscha Krettek added a comment -

          sunjincheng Sure, please go ahead!

          Show
          aljoscha Aljoscha Krettek added a comment - sunjincheng Sure, please go ahead!
          Hide
          aljoscha Aljoscha Krettek added a comment -

          sunjincheng Sure, please go ahead!

          Show
          aljoscha Aljoscha Krettek added a comment - sunjincheng Sure, please go ahead!
          Hide
          sunjincheng121 sunjincheng added a comment -

          Hi,Wenlong Lyu Aljoscha Krettek Thank you for your attention to this issue. I agree with Aljoscha Krettek that approach #2 makes our program more robust, I would like to try to use approach #2 to open PR.
          Best,
          SunJincheng

          Show
          sunjincheng121 sunjincheng added a comment - Hi, Wenlong Lyu Aljoscha Krettek Thank you for your attention to this issue. I agree with Aljoscha Krettek that approach #2 makes our program more robust, I would like to try to use approach #2 to open PR. Best, SunJincheng
          Hide
          aljoscha Aljoscha Krettek added a comment -

          I would like to suggest approach #2 with adding the ExecutionConfig to the AbstractKeyedStateBackend. Not all access to state necessarily goes through a KeyedStateStore so it would be good to have the initialisation logic there as a failsafe.

          Show
          aljoscha Aljoscha Krettek added a comment - I would like to suggest approach #2 with adding the ExecutionConfig to the AbstractKeyedStateBackend . Not all access to state necessarily goes through a KeyedStateStore so it would be good to have the initialisation logic there as a failsafe.
          Hide
          wenlong.lwl Wenlong Lyu added a comment -

          I think we should just throw an exception since all initialization has been done in KeyedStateStore.

          Show
          wenlong.lwl Wenlong Lyu added a comment - I think we should just throw an exception since all initialization has been done in KeyedStateStore.

            People

            • Assignee:
              aljoscha Aljoscha Krettek
              Reporter:
              sunjincheng121 sunjincheng
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development