Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7180

CoGroupStream perform checkpoint failed

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Resolved
    • Affects Version/s: 1.3.1
    • Fix Version/s: 1.3.2
    • Component/s: DataStream API
    • Labels:
      None

      Description

      When using the CoGroup api and enable the checkpoint, Job will failed when performing checkpoint, e.g:

              input1.coGroup(input2)
                      .where(new KeySelector<String, String>() {
                          @Override
                          public String getKey(String value) throws Exception {
                              return value;
                          }
                      })
                      .equalTo(new KeySelector<String, String>() {
                          @Override
                          public String getKey(String value) throws Exception {
                              return value;
                          }
                      })
                      .window(SlothJoinWindow.create())
                      .trigger(new SlothWindowTrigger(0))
                      .apply(new CoGroupFunction<String, String, String>() {
                          @Override
                          public void coGroup(Iterable<String> first, Iterable<String> second, Collector<String> out) throws Exception {
                              String outputStr = "first:" + first + " , second:" + second;
                              System.out.println(outputStr);
                              out.collect(outputStr);
                          }
                      })
                      .keyBy(new KeySelector<String, String>() {
                          @Override
                          public String getKey(String value) throws Exception {
                              return value;
                          }
                      })
                      .print();
      

        Activity

        Hide
        aljoscha Aljoscha Krettek added a comment -

        Cool, thanks for letting us know!

        Show
        aljoscha Aljoscha Krettek added a comment - Cool, thanks for letting us know!
        Hide
        sihuazhou Sihua Zhou added a comment -

        Checked my pom.xml, i'm using Flink 1.3.0.... After upgrading to 1.3.1, this problem did not appear. Please ignore this issue, i will close it.

        Show
        sihuazhou Sihua Zhou added a comment - Checked my pom.xml, i'm using Flink 1.3.0.... After upgrading to 1.3.1, this problem did not appear. Please ignore this issue, i will close it.
        Hide
        aljoscha Aljoscha Krettek added a comment -

        No problem. Are you 100 % sure that you're using Flink 1.3.1? This problem should have been fixed with FLINK-6808. If not, then we'll have to investigate more, of course.

        Show
        aljoscha Aljoscha Krettek added a comment - No problem. Are you 100 % sure that you're using Flink 1.3.1? This problem should have been fixed with FLINK-6808 . If not, then we'll have to investigate more, of course.
        Hide
        sihuazhou Sihua Zhou added a comment - - edited

        Aljoscha Krettek Sorry to late reply, i was on the train all the time. Here is the stack trace info.

        AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 for operator TriggerWindow(sloth.SlothJoinWindow@e45f292, ListStateDescriptor

        {serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@e42116a0}, sloth.SlothWindowTrigger@31a5c39e, WindowedStream.apply(CoGroupedStreams.java:300)) (2/4).}
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:963)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
        Caused by: java.lang.Exception: Could not materialize checkpoint 1 for operator TriggerWindow(sloth.SlothJoinWindow@e45f292, ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@e42116a0}

        , sloth.SlothWindowTrigger@31a5c39e, WindowedStream.apply(CoGroupedStreams.java:300)) (2/4).
        ... 6 more
        Caused by: java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893)
        ... 5 more
        Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1018)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:957)
        ... 5 more
        Caused by: java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
        at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
        ... 7 more
        Caused by: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
        at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.snapshotConfiguration(CoGroupedStreams.java:555)
        at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
        at org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39)
        at org.apache.flink.api.common.typeutils.base.ListSerializer.snapshotConfiguration(ListSerializer.java:183)
        at org.apache.flink.api.common.typeutils.base.ListSerializer.snapshotConfiguration(ListSerializer.java:47)
        at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateMetaData(RocksDBKeyedStateBackend.java:591)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:510)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:407)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:389)
        at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893)
        ... 5 more
        [CIRCULAR REFERENCE:java.lang.UnsupportedOperationException: This serializer is not registered for managed state.]

        Show
        sihuazhou Sihua Zhou added a comment - - edited Aljoscha Krettek Sorry to late reply, i was on the train all the time. Here is the stack trace info. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 for operator TriggerWindow(sloth.SlothJoinWindow@e45f292, ListStateDescriptor {serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@e42116a0}, sloth.SlothWindowTrigger@31a5c39e, WindowedStream.apply(CoGroupedStreams.java:300)) (2/4).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:963) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not materialize checkpoint 1 for operator TriggerWindow(sloth.SlothJoinWindow@e45f292, ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@e42116a0} , sloth.SlothWindowTrigger@31a5c39e, WindowedStream.apply(CoGroupedStreams.java:300)) (2/4). ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: This serializer is not registered for managed state. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893) ... 5 more Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1018) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:957) ... 5 more Caused by: java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: This serializer is not registered for managed state. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) ... 7 more Caused by: java.lang.UnsupportedOperationException: This serializer is not registered for managed state. at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.snapshotConfiguration(CoGroupedStreams.java:555) at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53) at org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39) at org.apache.flink.api.common.typeutils.base.ListSerializer.snapshotConfiguration(ListSerializer.java:183) at org.apache.flink.api.common.typeutils.base.ListSerializer.snapshotConfiguration(ListSerializer.java:47) at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateMetaData(RocksDBKeyedStateBackend.java:591) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:510) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:407) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:389) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893) ... 5 more [CIRCULAR REFERENCE:java.lang.UnsupportedOperationException: This serializer is not registered for managed state.]
        Hide
        aljoscha Aljoscha Krettek added a comment -

        Sihua Zhou Could you please attach a stack trace or some output that shows how the job fails?

        Show
        aljoscha Aljoscha Krettek added a comment - Sihua Zhou Could you please attach a stack trace or some output that shows how the job fails?

          People

          • Assignee:
            sihuazhou Sihua Zhou
            Reporter:
            sihuazhou Sihua Zhou
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development