Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-31399

Closure cleaner broken in Scala 2.12

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 2.4.5, 3.0.0
    • 2.4.6, 3.0.0
    • Spark Core
    • None

    Description

      The `ClosureCleaner` only support Scala functions and it uses the following check to catch closures

        // Check whether a class represents a Scala closure
        private def isClosure(cls: Class[_]): Boolean = {
          cls.getName.contains("$anonfun$")
        }
      

      This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala functions become Java lambdas.

      As an example, the following code works well in Spark 2.4 Spark Shell:

      scala> :pa
      // Entering paste mode (ctrl-D to finish)
      
      import org.apache.spark.sql.functions.lit
      
      case class Foo(id: String)
      val col = lit("123")
      val df = sc.range(0,10,1,1).map { _ => Foo("") }
      
      // Exiting paste mode, now interpreting.
      
      import org.apache.spark.sql.functions.lit
      defined class Foo
      col: org.apache.spark.sql.Column = 123
      df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at <pastie>:20
      

      But fails in 3.0

      scala> :pa
      // Entering paste mode (ctrl-D to finish)
      
      import org.apache.spark.sql.functions.lit
      
      case class Foo(id: String)
      val col = lit("123")
      val df = sc.range(0,10,1,1).map { _ => Foo("") }
      
      // Exiting paste mode, now interpreting.
      
      org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2371)
        at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
        at org.apache.spark.rdd.RDD.map(RDD.scala:421)
        ... 39 elided
      Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
      Serialization stack:
      	- object not serializable (class: org.apache.spark.sql.Column, value: 123)
      	- field (class: $iw, name: col, type: class org.apache.spark.sql.Column)
      	- object (class $iw, $iw@2d87ac2b)
      	- element of array (index: 0)
      	- array (class [Ljava.lang.Object;, size 1)
      	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
      	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1])
      	- writeReplace data (class: java.lang.invoke.SerializedLambda)
      	- object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393)
        ... 47 more
      

      *Apache Spark 2.4.5 with Scala 2.12*

      Welcome to
            ____              __
           / __/__  ___ _____/ /__
          _\ \/ _ \/ _ `/ __/  '_/
         /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
            /_/
      
      Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
      Type in expressions to have them evaluated.
      Type :help for more information.
      
      scala> :pa
      // Entering paste mode (ctrl-D to finish)
      
      import org.apache.spark.sql.functions.lit
      
      case class Foo(id: String)
      val col = lit("123")
      val df = sc.range(0,10,1,1).map { _ => Foo("") }
      
      // Exiting paste mode, now interpreting.
      
      org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
        at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:393)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.map(RDD.scala:392)
        ... 45 elided
      Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
      Serialization stack:
      	- object not serializable (class: org.apache.spark.sql.Column, value: 123)
      	- field (class: $iw, name: col, type: class org.apache.spark.sql.Column)
      	- object (class $iw, $iw@73534675)
      	- element of array (index: 0)
      	- array (class [Ljava.lang.Object;, size 1)
      	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
      	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1])
      	- writeReplace data (class: java.lang.invoke.SerializedLambda)
      	- object (class $Lambda$1952/356563238, $Lambda$1952/356563238@6ca95b1e)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
        ... 53 more
      

      Attachments

        Activity

          People

            rednaxelafx Kris Mok
            cloud_fan Wenchen Fan
            Votes:
            0 Vote for this issue
            Watchers:
            17 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: