Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.4.3, 3.0.1, 3.2.0
-
None
-
None
-
Spark 2.4.3 Scala 2.12
Spark 3.2.0 Scala 2.13.5 (Java 11.0.12)
Description
Note this is for scala 2.12:
There seems to be an issue in spark with serializing a udf that is created from a function assigned to a class member that references another function assigned to a class member. This is similar to https://issues.apache.org/jira/browse/SPARK-25047 but it looks like the resolution has an issue with this case. After trimming it down to the base issue I came up with the following to reproduce:
object TestLambdaShell extends Serializable { val hello: String => String = s => s"hello $s!" val lambdaTest: String => String = hello( _ ) def functionTest: String => String = hello( _ ) } val hello = udf( TestLambdaShell.hello ) val functionTest = udf( TestLambdaShell.functionTest ) val lambdaTest = udf( TestLambdaShell.lambdaTest ) sc.parallelize(Seq("world"),1).toDF("test").select(hello($"test")).show(1) sc.parallelize(Seq("world"),1).toDF("test").select(functionTest($"test")).show(1) sc.parallelize(Seq("world"),1).toDF("test").select(lambdaTest($"test")).show(1)
All of which works except the last line which results in an exception on the executors:
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field $$$82b5b23cea489b2712a1db46c77e458$$$$w$TestLambdaShell$.lambdaTest of type scala.Function1 in instance of $$$82b5b23cea489b2712a1db46c77e458$$$$w$TestLambdaShell$ at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
In spark 2.2.x I used a class that had something like this that worked fine, now that we've upgraded to 2.12 we ran into a few serialization issues in places, most of which were solved by extending serializable but this case was not fixed by that.
Also this happens regardless of whether it's done in the shell or in a jar.
So after much more debugging, this turns out to be some weird mix of scala 2.12.0 and scala 2.12.8. Spark is compiled on 2.12.8 and so is our own code but I noticed that the maven compiled class did not match the compiled class using 2.12.8 scalac directly. After a lot of digging we realized that scala-compiler actually indirectly depends on scala library 2.12.0 and only when the spark dependency is added does it start using it for some reason. Without the spark dependency and just direct scala 2.12.8 dependencies, the code builds fine and compiles correctly as 2.12.8.
We were able to fix this using:
<failOnMultipleScalaVersions>true</failOnMultipleScalaVersions>
<scalaCompatVersion>2.12</scalaCompatVersion>
<scalaVersion>2.12.8</scalaVersion>
And this resolves our issue for our own jars that we create and link to spark. However, my original test case still seems to reproduce in the spark shell and for us also in apache zeppelin so it seems almost like somehow they are also compiling it on 2.12.0 but I'm not quite sure how. In the spark pom.xml it seems to have the fail on multiple versions and compiles fine so I'm not quite sure how this is happening but at least its more isolated now. I'm also wondering if anything else could be affected by this.