Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6515

KafkaConsumer checkpointing fails because of ClassLoader issues

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.3.0
    • 1.3.0, 1.4.0
    • Connectors / Kafka
    • None

    Description

      A job with Kafka and checkpointing enabled fails with:

      java.lang.Exception: Error while triggering checkpoint 1 for Source: Custom Source -> Map -> Sink: Unnamed (1/1)
      	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1195)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.Exception: Could not perform checkpoint 1 for operator Source: Custom Source -> Map -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
      	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:112)
      	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1184)
      	... 5 more
      Caused by: java.lang.Exception: Could not complete snapshot 1 for operator Source: Custom Source -> Map -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:520)
      	... 7 more
      Caused by: java.lang.RuntimeException: Could not copy instance of (KafkaTopicPartition{topic='test-input', partition=0},-1).
      	at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:54)
      	at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:32)
      	at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:71)
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.<init>(DefaultOperatorStateBackend.java:368)
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:380)
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:191)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:392)
      	... 12 more
      Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
      	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
      	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
      	at java.lang.Class.forName0(Native Method)
      	at java.lang.Class.forName(Class.java:348)
      	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
      	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
      	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
      	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
      	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
      	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
      	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
      	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
      	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
      	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
      	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
      	at org.apache.flink.util.InstantiationUtil.clone(InstantiationUtil.java:371)
      	at org.apache.flink.util.InstantiationUtil.clone(InstantiationUtil.java:349)
      	at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:52)
      	... 18 more
      
      

      The problem seems to be TypeSerializer.copy(), which uses the wrong ClassLoader. Until recently this was not used but recent changes around asynchronous checkpointing of operator state require deep copies of the operator ListState and thus call this method.

      Attachments

        Activity

          People

            sewen Stephan Ewen
            aljoscha Aljoscha Krettek
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: