Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.3.0
-
None
Description
A job with Kafka and checkpointing enabled fails with:
java.lang.Exception: Error while triggering checkpoint 1 for Source: Custom Source -> Map -> Sink: Unnamed (1/1) at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1195) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not perform checkpoint 1 for operator Source: Custom Source -> Map -> Sink: Unnamed (1/1). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:112) at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1184) ... 5 more Caused by: java.lang.Exception: Could not complete snapshot 1 for operator Source: Custom Source -> Map -> Sink: Unnamed (1/1). at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1157) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1089) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:653) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:589) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:520) ... 7 more Caused by: java.lang.RuntimeException: Could not copy instance of (KafkaTopicPartition{topic='test-input', partition=0},-1). at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:54) at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:32) at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:71) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.<init>(DefaultOperatorStateBackend.java:368) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:380) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:392) ... 12 more Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) at org.apache.flink.util.InstantiationUtil.clone(InstantiationUtil.java:371) at org.apache.flink.util.InstantiationUtil.clone(InstantiationUtil.java:349) at org.apache.flink.runtime.state.JavaSerializer.copy(JavaSerializer.java:52) ... 18 more
The problem seems to be TypeSerializer.copy(), which uses the wrong ClassLoader. Until recently this was not used but recent changes around asynchronous checkpointing of operator state require deep copies of the operator ListState and thus call this method.