Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6808

Stream join fails when checkpointing is enabled

    XMLWordPrintableJSON

Details

    Description

      The combination of joining streams and checkpointing fails in 1.3.0. It used to work with the previous 1.2 version. Code example for failure:

      Example
          public static void main(String[] args) throws Exception {
      
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              // enable checkpoints
              env.enableCheckpointing(5000);
      
              // create two streams
              DataStreamSource<Long> one = env.generateSequence(0, 5000);
              DataStreamSource<Long> two = env.generateSequence(2000, 15000);
      
              // process both, provide a delay to make sure checkpoint will happen
              DataStream<String> oneProcessed = one.
                      map(oneValue -> {
                          Thread.sleep(1000);
                          return "val-" + oneValue;
                      });
              DataStream<String> twoProcessed = two.
                      map(oneValue -> {
                          Thread.sleep(1000);
                          return "val-" + oneValue;
                      });
      
              // join the two streams, join on string match
              DataStream<String> joinedStreams = oneProcessed.
                      join(twoProcessed).
                      where(String::toString).
                      equalTo(String::toString).
                      window(TumblingProcessingTimeWindows.of(Time.seconds(5))).
                      apply(new JoinFunction<String, String, String>() {
                          @Override
                          public String join(String oneValue, String twoValue) {
                              // nothing really relevant, just concatenate string
                              return oneValue + "+" + twoValue;
                          }
                      });
      
              // output results
              joinedStreams.print();
      
              env.execute("Issue with stream join and checkpoints");
          }
      

      Stack trace:

      java.lang.Exception: Could not perform checkpoint 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:550)
      	at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
      	at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
      	at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
      	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
      	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.Exception: Could not complete snapshot 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
      	... 8 more
      Caused by: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
      	at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.snapshotConfiguration(CoGroupedStreams.java:555)
      	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
      	at org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39)
      	at org.apache.flink.runtime.state.ArrayListSerializer.snapshotConfiguration(ArrayListSerializer.java:149)
      	at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:267)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:396)
      	... 13 more
      

      Attachments

        Issue Links

          Activity

            People

              tzulitai Tzu-Li (Gordon) Tai
              fassisrosa@hotmail.com Francisco Rosa
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: