Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30072

Cannot assign instance of SerializedLambda to field KeyGroupStreamPartitioner.keySelector

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.16.0
    • None
    • Runtime / Task
    • None

    Description

      In application mode, if the usrlib directories of the JM and TM differ, e.g. same jars but different names, the job is failing and throws this cryptic exception on the JM:

      2022-11-17 09:55:12,968 INFO  org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Restarting job.
      org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
          at org.apache.flink.streaming.api.graph.StreamConfig.getVertexNonChainedOutputs(StreamConfig.java:537) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1600) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1584) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:408) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:362) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:335) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:327) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:317) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.<init>(SourceOperatorStreamTask.java:84) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at jdk.internal.reflect.GeneratedConstructorAccessor38.newInstance(Unknown Source) ~[?:?]
          at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) ~[?:?]
          at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
          at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1589) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:714) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at java.lang.Thread.run(Unknown Source) ~[?:?]
      Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
          at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown Source) ~[?:?]
          at java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(Unknown Source) ~[?:?]
          at java.io.ObjectStreamClass.checkObjFieldValueTypes(Unknown Source) ~[?:?]
          at java.io.ObjectInputStream.defaultCheckFieldValues(Unknown Source) ~[?:?]
          at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
          at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
          at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
          at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
          at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
          at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
          at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
          at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
          at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
          at java.util.ArrayList.readObject(Unknown Source) ~[?:?]
          at jdk.internal.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) ~[?:?]
          at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
          at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
          at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
          at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
          at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
          at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
          at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
          at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
          at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          at org.apache.flink.streaming.api.graph.StreamConfig.getVertexNonChainedOutputs(StreamConfig.java:533) ~[flink-dist-1.16.0-ok.0.jar:1.16.0-ok.0]
          ... 15 more
      

      It seems that our normal exception reporting is not really working (maybe also a problem for more common failure?). The TM logs do contain the actual exception though:

      2022-11-17 10:11:43,551 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - TumblingEventTimeWindows -> Sink: Print to Std. Out (1/1)#3 (cc8e6b0246079230c5ac1bc335c70163_c27dcf7b54ef6bfd6cff02ca8870b681_0_3) switched from INITIALIZING to FAILED with failure cause: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.immerok.cloud.examples.WindowAggregation$MetadataEnrichingWindowFunction
      ClassLoader info: URL ClassLoader:
          file: 'usrlib/window-agg-2.jar' (missing)
      Class not resolvable through given classloader.
          at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:397)
          at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:162)
          at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:681)
          at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
          at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
          at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
          at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
          at java.base/java.lang.Thread.run(Unknown Source)
      Caused by: java.lang.ClassNotFoundException: com.immerok.cloud.examples.WindowAggregation$MetadataEnrichingWindowFunction
          at java.base/java.net.URLClassLoader.findClass(Unknown Source)
          at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
          at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
          at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
          at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
          at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
          at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
          at java.base/java.lang.Class.forName0(Native Method)
          at java.base/java.lang.Class.forName(Unknown Source)
          at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
          at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
          at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
          at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
          at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
          at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
          at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
          at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
          at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
          at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
          at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
          at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
          at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
          at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
          at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
          at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
          at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
          at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
          at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
          at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
          at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
          at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
          at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
          at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:383)
          ... 9 more
      

      How to reproduce:

      1. on JM: <flink>/usrlib/window-agg-1.jar
      2. on TM: <flink>/usrlib/window-agg-2.jar

      then start the job in application mode

      Attachments

        Activity

          People

            Unassigned Unassigned
            nkruber Nico Kruber
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: