Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6808

Stream join fails when checkpointing is enabled

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      The combination of joining streams and checkpointing fails in 1.3.0. It used to work with the previous 1.2 version. Code example for failure:

      Example
          public static void main(String[] args) throws Exception {
      
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              // enable checkpoints
              env.enableCheckpointing(5000);
      
              // create two streams
              DataStreamSource<Long> one = env.generateSequence(0, 5000);
              DataStreamSource<Long> two = env.generateSequence(2000, 15000);
      
              // process both, provide a delay to make sure checkpoint will happen
              DataStream<String> oneProcessed = one.
                      map(oneValue -> {
                          Thread.sleep(1000);
                          return "val-" + oneValue;
                      });
              DataStream<String> twoProcessed = two.
                      map(oneValue -> {
                          Thread.sleep(1000);
                          return "val-" + oneValue;
                      });
      
              // join the two streams, join on string match
              DataStream<String> joinedStreams = oneProcessed.
                      join(twoProcessed).
                      where(String::toString).
                      equalTo(String::toString).
                      window(TumblingProcessingTimeWindows.of(Time.seconds(5))).
                      apply(new JoinFunction<String, String, String>() {
                          @Override
                          public String join(String oneValue, String twoValue) {
                              // nothing really relevant, just concatenate string
                              return oneValue + "+" + twoValue;
                          }
                      });
      
              // output results
              joinedStreams.print();
      
              env.execute("Issue with stream join and checkpoints");
          }
      

      Stack trace:

      java.lang.Exception: Could not perform checkpoint 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:550)
      	at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
      	at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
      	at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
      	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
      	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.Exception: Could not complete snapshot 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
      	... 8 more
      Caused by: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
      	at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.snapshotConfiguration(CoGroupedStreams.java:555)
      	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
      	at org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39)
      	at org.apache.flink.runtime.state.ArrayListSerializer.snapshotConfiguration(ArrayListSerializer.java:149)
      	at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:267)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:396)
      	... 13 more
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            tzulitai Tzu-Li (Gordon) Tai
            fassisrosa@hotmail.com Francisco Rosa
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment