Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35542

ClassNotFoundException when deserializing CheckpointedOffset

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • jdbc-3.2.0, jdbc-3.1.2
    • jdbc-3.3.0
    • Connectors / JDBC
    • Flink 1.19.0

      Flink JDBC Connector 3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca)

      JDK 11 (Temurin)

    Description

      I use the latest flink-connector-jdbc code from the main branch, it's actually 3.2-SNAPSHOT (commit 2defbbcf4fc550a76dd9c664e1eed7d261e028ca).

       

      When jobs get interrupted while reading data from the JDBC source (for example, by the TaskManager outage), they cannot recover due to the following exception:

      java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
          at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:71)
          at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:34)
          at org.apache.flink.connector.base.source.hybrid.HybridSourceSplit.unwrapSplit(HybridSourceSplit.java:122)
          at org.apache.flink.connector.base.source.hybrid.HybridSourceReader.addSplits(HybridSourceReader.java:158)
          at org.apache.flink.connector.base.source.hybrid.HybridSourceReader.setCurrentReader(HybridSourceReader.java:247)
          at org.apache.flink.connector.base.source.hybrid.HybridSourceReader.handleSourceEvents(HybridSourceReader.java:186)
          at org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:571)
          at org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72)
          at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540)
          at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
          at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
          at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
          at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
          at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
          at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
          at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
          at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
          at java.base/java.lang.Thread.run(Unknown Source)
      Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.jdbc.source.split.CheckpointedOffset
          at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source)
          at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source)
          at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
          at java.base/java.lang.Class.forName0(Native Method)
          at java.base/java.lang.Class.forName(Unknown Source)
          at java.base/java.io.ObjectInputStream.resolveClass(Unknown Source)
          at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:92)
          at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
          at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
          at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
          at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
          at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
          at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
          at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
          at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
          at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserializeJdbcSourceSplit(JdbcSourceSplitSerializer.java:109)
          at org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer.deserialize(JdbcSourceSplitSerializer.java:69)
          ... 22 more 

       

      In our deployment, we embed the JDBC connector classes into the job JAR file. It means that the class org.apache.flink.connector.jdbc.source.split.CheckpointedOffset is visible only for the FlinkUserCodeClassLoader and not for the AppClassLoader. I believe the problem is in the following code snippet, where we use the class loader of the JDK's 
      DataInputStream class:

      public JdbcSourceSplit deserializeJdbcSourceSplit(DataInputStream in)
              throws IOException, ClassNotFoundException {
          // ....
          // Some lines skipped 
          CheckpointedOffset chkOffset =
                  InstantiationUtil.deserializeObject(chkOffsetBytes, in.getClass().getClassLoader());
      
          return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset);
      } 

      If I change it to the following:

      public JdbcSourceSplit deserializeJdbcSourceSplit(DataInputStream in)
              throws IOException, ClassNotFoundException {
          // .... 
          // Some lines skipped
          CheckpointedOffset chkOffset =
                  InstantiationUtil.deserializeObject(chkOffsetBytes, CheckpointedOffset.class.getClassLoader());
      
          return new JdbcSourceSplit(id, sqlTemplate, params, offset, chkOffset);
      } 

      Everything works as expected.
       

       

      Attachments

        Issue Links

          Activity

            People

              janosc Jan Gurda
              janosc Jan Gurda
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: