Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6808

Stream join fails when checkpointing is enabled

    Details

      Description

      The combination of joining streams and checkpointing fails in 1.3.0. It used to work with the previous 1.2 version. Code example for failure:

      Example
          public static void main(String[] args) throws Exception {
      
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              // enable checkpoints
              env.enableCheckpointing(5000);
      
              // create two streams
              DataStreamSource<Long> one = env.generateSequence(0, 5000);
              DataStreamSource<Long> two = env.generateSequence(2000, 15000);
      
              // process both, provide a delay to make sure checkpoint will happen
              DataStream<String> oneProcessed = one.
                      map(oneValue -> {
                          Thread.sleep(1000);
                          return "val-" + oneValue;
                      });
              DataStream<String> twoProcessed = two.
                      map(oneValue -> {
                          Thread.sleep(1000);
                          return "val-" + oneValue;
                      });
      
              // join the two streams, join on string match
              DataStream<String> joinedStreams = oneProcessed.
                      join(twoProcessed).
                      where(String::toString).
                      equalTo(String::toString).
                      window(TumblingProcessingTimeWindows.of(Time.seconds(5))).
                      apply(new JoinFunction<String, String, String>() {
                          @Override
                          public String join(String oneValue, String twoValue) {
                              // nothing really relevant, just concatenate string
                              return oneValue + "+" + twoValue;
                          }
                      });
      
              // output results
              joinedStreams.print();
      
              env.execute("Issue with stream join and checkpoints");
          }
      

      Stack trace:

      java.lang.Exception: Could not perform checkpoint 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:550)
      	at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
      	at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
      	at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
      	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
      	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.Exception: Could not complete snapshot 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
      	... 8 more
      Caused by: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
      	at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.snapshotConfiguration(CoGroupedStreams.java:555)
      	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
      	at org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39)
      	at org.apache.flink.runtime.state.ArrayListSerializer.snapshotConfiguration(ArrayListSerializer.java:149)
      	at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:267)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:396)
      	... 13 more
      

        Issue Links

          Activity

          Hide
          aljoscha Aljoscha Krettek added a comment -

          Tzu-Li (Gordon) Tai, could you please have a look? I think the problem is that the UnionSerializer is in fact used in managed state because it is used as the serialiser for the window contents in the underlying WindowOperator. What do you think?

          Show
          aljoscha Aljoscha Krettek added a comment - Tzu-Li (Gordon) Tai , could you please have a look? I think the problem is that the UnionSerializer is in fact used in managed state because it is used as the serialiser for the window contents in the underlying WindowOperator . What do you think?
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Ah, I see. This definitely should be fixed then. Will make it a blocker for 1.3.1.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Ah, I see. This definitely should be fixed then. Will make it a blocker for 1.3.1.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tzulitai opened a pull request:

          https://github.com/apache/flink/pull/4052

          FLINK-6808 Implement snapshotConfiguration/ensureCompatibility for CoGroupedStreams.UnionSerializer

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tzulitai/flink FLINK-6808

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4052.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4052


          commit bbf3dd13157610f35a6f938008cde726be201474
          Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
          Date: 2017-06-02T13:15:32Z

          FLINK-6808 Implement snapshotConfiguration/ensureCompatibility for CoGroupedStreams.UnionSerializer


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4052 FLINK-6808 Implement snapshotConfiguration/ensureCompatibility for CoGroupedStreams.UnionSerializer You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6808 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4052.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4052 commit bbf3dd13157610f35a6f938008cde726be201474 Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org> Date: 2017-06-02T13:15:32Z FLINK-6808 Implement snapshotConfiguration/ensureCompatibility for CoGroupedStreams.UnionSerializer
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4052#discussion_r119867402

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java —
          @@ -77,7 +82,7 @@
          private final DataStream<T2> input2;

          /**

          • * Creates new CoGroped data streams, which are the first step towards building a streaming
              • End diff –

          Groped ... 😅

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4052#discussion_r119867402 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java — @@ -77,7 +82,7 @@ private final DataStream<T2> input2; /** * Creates new CoGroped data streams, which are the first step towards building a streaming End diff – Groped ... 😅
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4052#discussion_r119867943

          — Diff: flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java —
          @@ -324,6 +329,41 @@ public void invoke(String value) throws Exception

          { Assert.assertEquals(expectedResult, testResults); }

          + @Test
          + public void testCoGroupOperatorWithCheckpoint() throws Exception {
          — End diff –

          Maybe mention that this is meant to check the snapshotting and the problem that was previously undetected.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4052#discussion_r119867943 — Diff: flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java — @@ -324,6 +329,41 @@ public void invoke(String value) throws Exception { Assert.assertEquals(expectedResult, testResults); } + @Test + public void testCoGroupOperatorWithCheckpoint() throws Exception { — End diff – Maybe mention that this is meant to check the snapshotting and the problem that was previously undetected.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tzulitai commented on the issue:

          https://github.com/apache/flink/pull/4052

          Thanks for the review 😄
          There was a checkstyle violation; will merge after another Travis run.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4052 Thanks for the review 😄 There was a checkstyle violation; will merge after another Travis run.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4052

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4052
          Hide
          tzulitai Tzu-Li (Gordon) Tai added a comment -

          Fixed for 1.3.1 via f74caf7062b1cc23a704f8f8b8171be430b60807.
          Fixed for master via 539787b21822eb839d0408a989cd541450bd08d2.

          Show
          tzulitai Tzu-Li (Gordon) Tai added a comment - Fixed for 1.3.1 via f74caf7062b1cc23a704f8f8b8171be430b60807. Fixed for master via 539787b21822eb839d0408a989cd541450bd08d2.

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              fassisrosa@hotmail.com Francisco Rosa
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development