Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6775

StateDescriptor cannot be shared by multiple subtasks

    Details

      Description

      The StateDescriptor contains the TypeSerializer which is used to serialize the state. The serializer instance won't be duplicated when it is accessed. Therefore, the StateDescriptor cannot be shared if the TypeSerializer is stateful as in the case of the KryoSerializer.

      This problem can easily arise when a user defines a stateful operator which defines the StateDescriptor statically. The work around is to not define a static StateDescriptor. However, I would still make it a blocker, because it is extremely hard to debug for the user if things fail because the TypeSerializer is used concurrently.

      The following operator produces the problem:

      private static final class StatefulMapper extends RichMapFunction<Tuple2<Long,Long>, Tuple2<Long, Long>> implements CheckpointedFunction {
              private static final long serialVersionUID = -1175717056869107847L;
              private static final ValueStateDescriptor<PojoType> POJO_VALUE_STATE = new ValueStateDescriptor<PojoType>("pojoType", PojoType.class);
      
              private transient ValueState<PojoType> valueState;
      
              public StatefulMapper() {
                  valueState = null;
              }
      
              @Override
              public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws Exception {
                  PojoType pojoType = new PojoType(1, 1.0, "1.0", new NestedPojo(2, 2.0));
                  valueState.update(pojoType);
                  return tuple;
              }
      
              @Override
              public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {}
      
              @Override
              public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
                  valueState = functionInitializationContext.getKeyedStateStore().getState(POJO_VALUE_STATE);
              }
          }
      

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/4025

          FLINK-6775 [state] Duplicate StateDescriptor's serializer

          Duplicate the TypeSerializer before returning it from the StateDescriptor. That way
          we ensure that StateDescriptors can be shared by multiple threads.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink fixStateDescriptor

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4025.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4025


          commit 16cb7b12e9d8a59ad957c206f03076fca2143b71
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-31T11:59:55Z

          FLINK-6775 [state] Duplicate StateDescriptor's serializer

          Duplicate the TypeSerializer before returning it from the StateDescriptor. That way
          we ensure that StateDescriptors can be shared by multiple threads.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4025 FLINK-6775 [state] Duplicate StateDescriptor's serializer Duplicate the TypeSerializer before returning it from the StateDescriptor. That way we ensure that StateDescriptors can be shared by multiple threads. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixStateDescriptor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4025.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4025 commit 16cb7b12e9d8a59ad957c206f03076fca2143b71 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-31T11:59:55Z FLINK-6775 [state] Duplicate StateDescriptor's serializer Duplicate the TypeSerializer before returning it from the StateDescriptor. That way we ensure that StateDescriptors can be shared by multiple threads.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4025#discussion_r119340007

          — Diff: flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java —
          @@ -101,4 +108,35 @@ public void testValueStateDescriptorAutoSerializer() throws Exception

          { assertNotNull(copy.getElementSerializer()); assertEquals(StringSerializer.INSTANCE, copy.getElementSerializer()); }

          +
          + /**
          + * FLINK-6775
          + *
          + * Tests that the returned serializer is duplicated if it is stateful. This allows to
          — End diff –

          I think the wording could be a bit misleading, it's not only duplicated when it is stateful but it's always duplicated. (also holds for the other tests)

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4025#discussion_r119340007 — Diff: flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java — @@ -101,4 +108,35 @@ public void testValueStateDescriptorAutoSerializer() throws Exception { assertNotNull(copy.getElementSerializer()); assertEquals(StringSerializer.INSTANCE, copy.getElementSerializer()); } + + /** + * FLINK-6775 + * + * Tests that the returned serializer is duplicated if it is stateful. This allows to — End diff – I think the wording could be a bit misleading, it's not only duplicated when it is stateful but it's always duplicated. (also holds for the other tests)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4025#discussion_r119340919

          — Diff: flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java —
          @@ -101,4 +108,35 @@ public void testValueStateDescriptorAutoSerializer() throws Exception

          { assertNotNull(copy.getElementSerializer()); assertEquals(StringSerializer.INSTANCE, copy.getElementSerializer()); }

          +
          + /**
          + * FLINK-6775
          + *
          + * Tests that the returned serializer is duplicated if it is stateful. This allows to
          — End diff –

          Will change it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4025#discussion_r119340919 — Diff: flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java — @@ -101,4 +108,35 @@ public void testValueStateDescriptorAutoSerializer() throws Exception { assertNotNull(copy.getElementSerializer()); assertEquals(StringSerializer.INSTANCE, copy.getElementSerializer()); } + + /** + * FLINK-6775 + * + * Tests that the returned serializer is duplicated if it is stateful. This allows to — End diff – Will change it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/4025

          Thanks for the quick review @aljoscha. Yes my filter condition was `*StateDescriptorTest`. Will add a test for the `AggregatingStateDescriptor`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4025 Thanks for the quick review @aljoscha. Yes my filter condition was `*StateDescriptorTest`. Will add a test for the `AggregatingStateDescriptor`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4025

          LGTM, really like the change on the `testCorrectClassLoaderUsedOnSnapshot` test!

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4025 LGTM, really like the change on the `testCorrectClassLoaderUsedOnSnapshot` test!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/4025

          Thanks for the review @tzulitai and @aljoscha. Failing test case is unrelated. Merging this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4025 Thanks for the review @tzulitai and @aljoscha. Failing test case is unrelated. Merging this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4025

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4025
          Hide
          till.rohrmann Till Rohrmann added a comment -

          1.4.0: 88ffad272eea5865cc43bf44a8980754d8711178
          1.3.1: d0e417e51fb7f29adfbb8779ceee7c01a9cdc7c7
          1.2.2: 6f482aeb36f79a8059be1a2350e6d049cf2020e5

          Show
          till.rohrmann Till Rohrmann added a comment - 1.4.0: 88ffad272eea5865cc43bf44a8980754d8711178 1.3.1: d0e417e51fb7f29adfbb8779ceee7c01a9cdc7c7 1.2.2: 6f482aeb36f79a8059be1a2350e6d049cf2020e5

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development