Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6808

Stream join fails when checkpointing is enabled

    Details

      Description

      The combination of joining streams and checkpointing fails in 1.3.0. It used to work with the previous 1.2 version. Code example for failure:

      Example
          public static void main(String[] args) throws Exception {
      
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              // enable checkpoints
              env.enableCheckpointing(5000);
      
              // create two streams
              DataStreamSource<Long> one = env.generateSequence(0, 5000);
              DataStreamSource<Long> two = env.generateSequence(2000, 15000);
      
              // process both, provide a delay to make sure checkpoint will happen
              DataStream<String> oneProcessed = one.
                      map(oneValue -> {
                          Thread.sleep(1000);
                          return "val-" + oneValue;
                      });
              DataStream<String> twoProcessed = two.
                      map(oneValue -> {
                          Thread.sleep(1000);
                          return "val-" + oneValue;
                      });
      
              // join the two streams, join on string match
              DataStream<String> joinedStreams = oneProcessed.
                      join(twoProcessed).
                      where(String::toString).
                      equalTo(String::toString).
                      window(TumblingProcessingTimeWindows.of(Time.seconds(5))).
                      apply(new JoinFunction<String, String, String>() {
                          @Override
                          public String join(String oneValue, String twoValue) {
                              // nothing really relevant, just concatenate string
                              return oneValue + "+" + twoValue;
                          }
                      });
      
              // output results
              joinedStreams.print();
      
              env.execute("Issue with stream join and checkpoints");
          }
      

      Stack trace:

      java.lang.Exception: Could not perform checkpoint 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:550)
      	at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
      	at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
      	at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
      	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
      	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.Exception: Could not complete snapshot 1 for operator TriggerWindow(TumblingProcessingTimeWindows(5000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@3769cce0}, ProcessingTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:300)) -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:406)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:542)
      	... 8 more
      Caused by: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
      	at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.snapshotConfiguration(CoGroupedStreams.java:555)
      	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
      	at org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39)
      	at org.apache.flink.runtime.state.ArrayListSerializer.snapshotConfiguration(ArrayListSerializer.java:149)
      	at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:267)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:396)
      	... 13 more
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                tzulitai Tzu-Li (Gordon) Tai
                Reporter:
                fassisrosa@hotmail.com Francisco Rosa
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: