Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6808

Stream join fails when checkpointing is enabled

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

      Description

      The combination of joining streams and checkpointing fails in 1.3.0. It used to work with the previous 1.2 version. Code example for failure:

      Example
          public static void main(String[] args) throws Exception {
      
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              // enable checkpoints
              env.enableCheckpointing(5000);
      
              // create two streams
              DataStreamSource<Long> one = env.generateSequence(0, 5000);
              DataStreamSource<Long> two = env.generateSequence(2000, 15000);
      
              // process both, provide a delay to make sure checkpoint will happen
              DataStream<String> oneProcessed = one.
                      map(oneValue -> {
                          Thread.sleep(1000);
                          return "val-" + oneValue;
                      });
              DataStream<String> twoProcessed = two.
                      map(oneValue -> {
                          Thread.sleep(1000);
                          return "val-" + oneValue;
                      });
      
              // join the two streams, join on string match
              DataStream<String> joinedStreams = oneProcessed.
                      join(twoProcessed).
                      where(String::toString).
                      equalTo(String::toString).
                      window(TumblingProcessingTimeWindows.of(Time.seconds(5))).
                      apply(new JoinFunction<String, String, String>() {
                          @Override
                          public String join(String oneValue, String twoValue) {
                              // nothing really relevant, just concatenate string
                              return oneValue + "+" + twoValue;
                          }
                      });
      
              // output results
              joinedStreams.print();
      
              env.execute("Issue with stream join and checkpoints");
          }
      

      Stack trace:

      java.lang.Exception: Could not perform checkpoint 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:550)
      	at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
      	at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
      	at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
      	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
      	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.Exception: Could not complete snapshot 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
      	... 8 more
      Caused by: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
      	at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.snapshotConfiguration(CoGroupedStreams.java:555)
      	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
      	at org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39)
      	at org.apache.flink.runtime.state.ArrayListSerializer.snapshotConfiguration(ArrayListSerializer.java:149)
      	at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:267)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:396)
      	... 13 more
      

        Attachments

          Activity

            People

            • Assignee:
              tzulitai Tzu-Li (Gordon) Tai
              Reporter:
              fassisrosa@hotmail.com Francisco Rosa

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment