Description
SPARK-37907 added constant-folding support to the InvokeLike family of expressions. Unfortunately it introduced a regression for cases when a constant-folded InvokeLike expression returned a non-serializable result. {{ExpressionEncoder}}s is an area where this problem may be exposed, e.g. when using sparksql-scalapb on Spark 3.3.0+.
Below is a minimal repro to demonstrate this issue:
import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} import org.apache.spark.sql.types.{LongType, ObjectType} class NotSerializableBoxedLong(longVal: Long) { def add(other: Long): Long = longVal + other } case class SerializableBoxedLong(longVal: Long) { def toNotSerializable(): NotSerializableBoxedLong = new NotSerializableBoxedLong(longVal) } val litExpr = Literal.fromObject(SerializableBoxedLong(42L), ObjectType(classOf[SerializableBoxedLong])) val toNotSerializableExpr = Invoke(litExpr, "toNotSerializable", ObjectType(classOf[NotSerializableBoxedLong])) val addExpr = Invoke(toNotSerializableExpr, "add", LongType, Seq(UnresolvedAttribute.quotedString("id"))) val df = spark.range(2).select(new Column(addExpr)) df.collect
Before SPARK-37907, this example would run fine and result in [[42], [43]]. But after SPARK-37907, it'd fail with:
... Caused by: java.io.NotSerializableException: NotSerializableBoxedLong Serialization stack: - object not serializable (class: NotSerializableBoxedLong, value: NotSerializableBoxedLong@71231636) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 2) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 3) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$3123/1641694389, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$3123/1641694389@185db22c) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)