Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6515

KafkaConsumer checkpointing fails because of ClassLoader issues

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: Kafka Connector
    • Labels:
      None

      Description

      A job with Kafka and checkpointing enabled fails with:

      java.lang.Exception: Error while triggering checkpoint 1 for Source: Custom Source -> Map -> Sink: Unnamed (1/1)
      	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1195)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.Exception: Could not perform checkpoint 1 for operator Source: Custom Source -> Map -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
      	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:112)
      	at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1184)
      	... 5 more
      Caused by: java.lang.Exception: Could not complete snapshot 1 for operator Source: Custom Source -> Map -> Sink: Unnamed (1/1).
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:520)
      	... 7 more
      Caused by: java.lang.RuntimeException: Could not copy instance of (KafkaTopicPartition{topic='test-input', partition=0},-1).
      	at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:54)
      	at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:32)
      	at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:71)
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.<init>(DefaultOperatorStateBackend.java:368)
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:380)
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:191)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:392)
      	... 12 more
      Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
      	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
      	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
      	at java.lang.Class.forName0(Native Method)
      	at java.lang.Class.forName(Class.java:348)
      	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
      	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
      	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
      	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
      	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
      	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
      	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
      	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
      	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
      	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
      	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
      	at org.apache.flink.util.InstantiationUtil.clone(InstantiationUtil.java:371)
      	at org.apache.flink.util.InstantiationUtil.clone(InstantiationUtil.java:349)
      	at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:52)
      	... 18 more
      
      

      The problem seems to be TypeSerializer.copy(), which uses the wrong ClassLoader. Until recently this was not used but recent changes around asynchronous checkpointing of operator state require deep copies of the operator ListState and thus call this method.

        Attachments

          Activity

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              aljoscha Aljoscha Krettek
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: