Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
jdbc-3.2.0, jdbc-3.1.2
-
Flink 1.19.0
Flink JDBC Connector 3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca)
JDK 11 (Temurin)
Description
I use the latest flink-connector-jdbc code from the main branch, it's actually 3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca).
When jobs get interrupted while reading data from the JDBC source (for example, by the TaskManager outage), they cannot recover due to the following exception:
java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.source.split.CheckpointedOffset at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:71) at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:34) at org.apache.flink.connector.base.source.hybrid.HybridSourceSplit.unwrapSplit(HybridSourceSplit.java:122) at org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(HybridSourceReader.java:158) at org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(HybridSourceReader.java:247) at org.apache.flink.connector.base.source.hybrid.HybridSourceReader.handleSourceEvents(HybridSourceReader.java:186) at org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:571) at org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.source.split.CheckpointedOffset at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Unknown Source) at java.base/java.io.ObjectInputStream.resolveClass(Unknown Source) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:92) at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source) at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.base/java.io.ObjectInputStream.readObject0(Unknown Source) at java.base/java.io.ObjectInputStream.readObject(Unknown Source) at java.base/java.io.ObjectInputStream.readObject(Unknown Source) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527) at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserializeJdbcSourceSplit(JdbcSourceSplitSerializer.java:109) at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:69) ... 22 more
In our deployment, we embed the JDBC connector classes into the job JAR file. It means that the class org.apache.flink.connector.jdbc.source.split.CheckpointedOffset is visible only for the FlinkUserCodeClassLoader and not for the AppClassLoader. I believe the problem is in the following code snippet, where we use the class loader of the JDK's
DataInputStream class:
public JdbcSourceSplit deserializeJdbcSourceSplit(DataInputStream in) throws IOException, ClassNotFoundException { // .... // Some lines skipped CheckpointedOffset chkOffset = InstantiationUtil.deserializeObject(chkOffsetBytes, in.getClass().getClassLoader()); return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset); }
If I change it to the following:
public JdbcSourceSplit deserializeJdbcSourceSplit(DataInputStream in) throws IOException, ClassNotFoundException { // .... // Some lines skipped CheckpointedOffset chkOffset = InstantiationUtil.deserializeObject(chkOffsetBytes, CheckpointedOffset.class.getClassLoader()); return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset); }
Everything works as expected.
Attachments
Issue Links
- links to