Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19039

UDF ClosureCleaner bug when UDF, col applied in paste mode in REPL

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 1.6.3, 2.0.2, 2.1.0, 2.3.0
    • None
    • SQL

    Description

      When I try this:

      • Define UDF
      • Apply UDF to get Column
      • Use Column in a DataFrame

      I can find weird behavior in the spark-shell when using paste mode.

      To reproduce this, paste this into the spark-shell:

      import org.apache.spark.sql.functions._
      val df = spark.createDataFrame(Seq(
        ("hi", 1),
        ("there", 2),
        ("the", 3),
        ("end", 4)
      )).toDF("a", "b")
      
      val myNumbers = Set(1,2,3)
      val tmpUDF = udf { (n: Int) => myNumbers.contains(n) }
      
      val rowHasMyNumber = tmpUDF($"b")
      df.where(rowHasMyNumber).show()
      

      Stack trace for Spark 2.0 (similar for other versions):

      org.apache.spark.SparkException: Task not serializable
      	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
      	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
      	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
      	at org.apache.spark.SparkContext.clean(SparkContext.scala:2057)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
      	at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
      	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225)
      	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308)
      	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
      	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
      	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
      	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551)
      	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
      	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
      	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
      	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
      	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581)
      	at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
      	at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
      	at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
      	at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
      	at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
      	at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
      	at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45)
      	at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57)
      	at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59)
      	at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61)
      	at linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7)
      	at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6)
      Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
      Serialization stack:
      	- object not serializable (class: org.apache.spark.sql.Column, value: UDF(b))
      	- field (class: linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw, name: rowHasMyNumber, type: class org.apache.spark.sql.Column)
      	- object (class linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw, linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw@6688375a)
      	- field (class: linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1, name: $outer, type: class linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw)
      	- object (class linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1, <function1>)
      	- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1)
      	- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
      	- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
      	- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(input[1, int, false]))
      	- element of array (index: 1)
      	- array (class [Ljava.lang.Object;, size 2)
      	- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
      	- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
      	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
      	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
      	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
      	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
      	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
      	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
      	at org.apache.spark.SparkContext.clean(SparkContext.scala:2057)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817)
      	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
      	at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
      	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225)
      	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308)
      	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
      	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
      	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
      	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551)
      	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
      	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
      	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
      	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
      	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581)
      	at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
      	at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
      	at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
      	at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
      	at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
      	at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
      	at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45)
      	at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57)
      	at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59)
      	at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61)
      	at linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7)
      	at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6)
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            josephkb Joseph K. Bradley
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: