Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6515

KafkaConsumer checkpointing fails because of ClassLoader issues

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      A job with Kafka and checkpointing enabled fails with:

      java.lang.Exception: Error while triggering checkpoint 1 for Source: Custom Source -> Map -> Sink: Unnamed (1/1)
      	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1195)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.Exception: Could not perform checkpoint 1 for operator Source: Custom Source -> Map -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
      	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:112)
      	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1184)
      	... 5 more
      Caused by: java.lang.Exception: Could not complete snapshot 1 for operator Source: Custom Source -> Map -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:520)
      	... 7 more
      Caused by: java.lang.RuntimeException: Could not copy instance of (KafkaTopicPartition{topic='test-input', partition=0},-1).
      	at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:54)
      	at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:32)
      	at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:71)
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.<init>(DefaultOperatorStateBackend.java:368)
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:380)
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:191)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:392)
      	... 12 more
      Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
      	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
      	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
      	at java.lang.Class.forName0(Native Method)
      	at java.lang.Class.forName(Class.java:348)
      	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
      	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
      	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
      	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
      	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
      	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
      	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
      	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
      	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
      	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
      	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
      	at org.apache.flink.util.InstantiationUtil.clone(InstantiationUtil.java:371)
      	at org.apache.flink.util.InstantiationUtil.clone(InstantiationUtil.java:349)
      	at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:52)
      	... 18 more
      
      

      The problem seems to be TypeSerializer.copy(), which uses the wrong ClassLoader. Until recently this was not used but recent changes around asynchronous checkpointing of operator state require deep copies of the operator ListState and thus call this method.

        Activity

        Hide
        tzulitai Tzu-Li (Gordon) Tai added a comment -

        Thanks a lot for reporting this Yassine Marzougui!
        Double checked this, it is indeed still a problem. Here's the JIRA for the follow-up issue: https://issues.apache.org/jira/browse/FLINK-6714.

        Show
        tzulitai Tzu-Li (Gordon) Tai added a comment - Thanks a lot for reporting this Yassine Marzougui ! Double checked this, it is indeed still a problem. Here's the JIRA for the follow-up issue: https://issues.apache.org/jira/browse/FLINK-6714 .
        Hide
        aljoscha Aljoscha Krettek added a comment -

        We should double check, this not working would definitely be a release blocker.

        Show
        aljoscha Aljoscha Krettek added a comment - We should double check, this not working would definitely be a release blocker.
        Hide
        ymarzougui Yassine Marzougui added a comment -

        Yes, I used flink.version=1.4-SNAPSHOT in the user code.

        In addition, it looks like the fix (https://github.com/apache/flink/commit/6f8022e35e0a49d5dfffa0ab7fd1c964b1c1bf0d ) didn't modify the kafka-connector code. And the exception I encountered is actually from the new code

        Caused by: org.apache.flink.util.FlinkRuntimeException: Could not copy element via serialization
        Show
        ymarzougui Yassine Marzougui added a comment - Yes, I used flink.version=1.4-SNAPSHOT in the user code. In addition, it looks like the fix ( https://github.com/apache/flink/commit/6f8022e35e0a49d5dfffa0ab7fd1c964b1c1bf0d ) didn't modify the kafka-connector code. And the exception I encountered is actually from the new code Caused by: org.apache.flink.util.FlinkRuntimeException: Could not copy element via serialization
        Hide
        rmetzger Robert Metzger added a comment -

        Are you sure that you've re-build your user code using he correct Flink version?
        The Kafka connector code is usually located in the user jar, so you need to update both Flink and your user code.

        Show
        rmetzger Robert Metzger added a comment - Are you sure that you've re-build your user code using he correct Flink version? The Kafka connector code is usually located in the user jar, so you need to update both Flink and your user code.
        Hide
        ymarzougui Yassine Marzougui added a comment -

        Hi,
        I'm still bumping into this issue for the branch release-1.3 and the lastest master (1.4-SNAPSHOT, Commit: 546e2ad)
        I'm getting the following exception when a checkpoint is triggered:

        java.lang.Exception: Error while triggering checkpoint 1 for Source: Custom Source -> Flat Map -> Map (4/8)
        	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1195)
        	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        	at java.lang.Thread.run(Thread.java:745)
        Caused by: java.lang.Exception: Could not perform checkpoint 1 for operator Source: Custom Source -> Flat Map -> Map (4/8).
        	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:528)
        	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:111)
        	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1184)
        	... 5 more
        Caused by: java.lang.Exception: Could not complete snapshot 1 for operator Source: Custom Source -> Flat Map -> Map (4/8).
        	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:409)
        	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1158)
        	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1090)
        	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:655)
        	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:591)
        	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:522)
        	... 7 more
        Caused by: org.apache.flink.util.FlinkRuntimeException: Could not copy element via serialization: (KafkaTopicPartition{topic='pre-bid-urls', partition=7},-1)
        	at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:53)
        	at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:33)
        	at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
        	at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.<init>(DefaultOperatorStateBackend.java:384)
        	at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:396)
        	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:192)
        	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:394)
        	... 12 more
        Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
        	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        	at java.lang.Class.forName0(Native Method)
        	at java.lang.Class.forName(Class.java:348)
        	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
        	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
        	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
        	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
        	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
        	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
        	at org.apache.flink.util.InstantiationUtil.clone(InstantiationUtil.java:371)
        	at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:51)
        	... 18 more
        

        Any Idea what's going on?

        Show
        ymarzougui Yassine Marzougui added a comment - Hi, I'm still bumping into this issue for the branch release-1.3 and the lastest master (1.4-SNAPSHOT, Commit: 546e2ad) I'm getting the following exception when a checkpoint is triggered: java.lang.Exception: Error while triggering checkpoint 1 for Source: Custom Source -> Flat Map -> Map (4/8) at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1195) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang. Thread .run( Thread .java:745) Caused by: java.lang.Exception: Could not perform checkpoint 1 for operator Source: Custom Source -> Flat Map -> Map (4/8). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:528) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:111) at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1184) ... 5 more Caused by: java.lang.Exception: Could not complete snapshot 1 for operator Source: Custom Source -> Flat Map -> Map (4/8). at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:409) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1158) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1090) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:655) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:591) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:522) ... 7 more Caused by: org.apache.flink.util.FlinkRuntimeException: Could not copy element via serialization: (KafkaTopicPartition{topic='pre-bid-urls', partition=7},-1) at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:53) at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:33) at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.<init>(DefaultOperatorStateBackend.java:384) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:396) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:192) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:394) ... 12 more Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang. ClassLoader .loadClass( ClassLoader .java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang. ClassLoader .loadClass( ClassLoader .java:357) at java.lang. Class .forName0(Native Method) at java.lang. Class .forName( Class .java:348) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) at org.apache.flink.util.InstantiationUtil.clone(InstantiationUtil.java:371) at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:51) ... 18 more Any Idea what's going on?
        Hide
        StephanEwen Stephan Ewen added a comment -

        Fixed in

        • 1.3.0 via cc3512ee9bbcc965278b43642cc9481f77027c4f
        • 1.4.0 via 6f8022e35e0a49d5dfffa0ab7fd1c964b1c1bf0d
        Show
        StephanEwen Stephan Ewen added a comment - Fixed in 1.3.0 via cc3512ee9bbcc965278b43642cc9481f77027c4f 1.4.0 via 6f8022e35e0a49d5dfffa0ab7fd1c964b1c1bf0d
        Hide
        rmetzger Robert Metzger added a comment -

        When can we expect a fix for this?

        Show
        rmetzger Robert Metzger added a comment - When can we expect a fix for this?

          People

          • Assignee:
            StephanEwen Stephan Ewen
            Reporter:
            aljoscha Aljoscha Krettek
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development