Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31243

KryoSerializer when loaded from user code classloader cannot load Scala extensions from app classloader

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.15.3, 1.16.1
    • None
    • API / Core
    • None
    • OS: Amazon Linux 2

      JVM: Amazon Corretto 11

       

    Description

      The KryoSerializer uses Class.forName() to dynamically load Scala extensions by name. This seems to imply that it references only its own classloader to find these extensions. By default, as the application classloader is favored for KryoSerializer, this implies that unless the flink-scala artifact is available to the application classloader, the Scala extensions cannot be loaded. Scala applications that include flink-scala are therefore unable to benefit from the Scala extensions to the Kryo Serializer.

      Exception looks like this:

      java.lang.ClassNotFoundException: org.apache.flink.runtime.types.FlinkScalaKryoInstantiator
          at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
          at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
          at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
          at java.base/java.lang.Class.forName0(Native Method)
          at java.base/java.lang.Class.forName(Class.java:315)
          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:486)
          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:521)
          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryo(KryoSerializer.java:720)
          at software.amazon.kinesisanalytics.kryotest.Main.main(Main.java:16)
          at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
          at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.base/java.lang.reflect.Method.invoke(Method.java:566)
          at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
          at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
          at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
          at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
          at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
          at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:239)
          at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)

      Example code resulting in this issue:

      Main class for Flink application:

      package software.amazon.kinesisanalytics.kryotest;
      
      import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      
      import java.io.Serializable;
      
      public class Main {
          private static class Something implements Serializable {
              public static long serialVersionUID = 289034745902347830L;
          }
      
          public static void main(String... args) {
              StreamExecutionEnvironment executionEnvironment = new StreamExecutionEnvironment();
              KryoSerializer<Something> serializer = new KryoSerializer<>(Something.class, executionEnvironment.getConfig());
              serializer.getKryo();
          }
      }
      

      build.gradle for Flink application:

      plugins {
          id 'application'
          id 'java'
          id 'com.github.johnrengelman.shadow' version '7.1.2'
      }
      
      group 'software.amazon.kinesisanalytics'
      version '0.1'
      
      repositories {
          mavenCentral()
      }
      
      dependencies {
          compileOnly 'org.apache.flink:flink-core:1.15.2'
          compileOnly 'org.apache.flink:flink-streaming-java:1.15.2'
          implementation 'org.apache.flink:flink-scala_2.12:1.15.2'
      }
      
      shadowJar {
          dependencies {
              exclude(dependency('com.esotericsoftware.kryo:.*:.*'))
              exclude(dependency('com.esotericsoftware.minlog:.*:.*'))
              exclude(dependency('com.twitter:.*:.*'))
              exclude(dependency('org.apache.flink:flink-core:.*'))
              exclude(dependency('org.apache.flink:flink-streaming-java:.*'))
              exclude(dependency('org.scala-lang:.*:.*'))
          }
      }
      
      mainClassName = 'software.amazon.kinesisanalytics.kryotest.Main'
       

      Note that the application jar does not include Kryo itself, nor flink-core, but does include flink-scala.

      Placing flink-scala in the application classpath eliminates the error, but as I understand it, the point of eliminating Scala from the Flink application classloader was to allow the only Scala dependencies to be loaded by the user code classloader. This issue prevents that from being achieved for the Scala extensions to the Kryo Serializer.

      Attachments

        Activity

          People

            Unassigned Unassigned
            amitgurd Amit Gurdasani
            Votes:
            1 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: