Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40380

Constant-folding of InvokeLike should not result in non-serializable result

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.3.0
    • 3.3.1, 3.4.0
    • SQL
    • None

    Description

      SPARK-37907 added constant-folding support to the InvokeLike family of expressions. Unfortunately it introduced a regression for cases when a constant-folded InvokeLike expression returned a non-serializable result. {{ExpressionEncoder}}s is an area where this problem may be exposed, e.g. when using sparksql-scalapb on Spark 3.3.0+.

      Below is a minimal repro to demonstrate this issue:

      import org.apache.spark.sql.Column
      import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
      import org.apache.spark.sql.catalyst.expressions.Literal
      import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
      import org.apache.spark.sql.types.{LongType, ObjectType}
      class NotSerializableBoxedLong(longVal: Long) { def add(other: Long): Long = longVal + other }
      case class SerializableBoxedLong(longVal: Long) { def toNotSerializable(): NotSerializableBoxedLong = new NotSerializableBoxedLong(longVal) }
      val litExpr = Literal.fromObject(SerializableBoxedLong(42L), ObjectType(classOf[SerializableBoxedLong]))
      val toNotSerializableExpr = Invoke(litExpr, "toNotSerializable", ObjectType(classOf[NotSerializableBoxedLong]))
      val addExpr = Invoke(toNotSerializableExpr, "add", LongType, Seq(UnresolvedAttribute.quotedString("id")))
      val df = spark.range(2).select(new Column(addExpr))
      df.collect
      

      Before SPARK-37907, this example would run fine and result in [[42], [43]]. But after SPARK-37907, it'd fail with:

      ...
      Caused by: java.io.NotSerializableException: NotSerializableBoxedLong
      Serialization stack:
      	- object not serializable (class: NotSerializableBoxedLong, value: NotSerializableBoxedLong@71231636)
      	- element of array (index: 1)
      	- array (class [Ljava.lang.Object;, size 2)
      	- element of array (index: 1)
      	- array (class [Ljava.lang.Object;, size 3)
      	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
      	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
      	- writeReplace data (class: java.lang.invoke.SerializedLambda)
      	- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$3123/1641694389, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$3123/1641694389@185db22c)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)
      

      Attachments

        Activity

          People

            rednaxelafx Kris Mok
            rednaxelafx Kris Mok
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: