Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
1.6.3, 2.0.2, 2.1.0, 2.3.0
-
None
Description
When I try this:
- Define UDF
- Apply UDF to get Column
- Use Column in a DataFrame
I can find weird behavior in the spark-shell when using paste mode.
To reproduce this, paste this into the spark-shell:
import org.apache.spark.sql.functions._ val df = spark.createDataFrame(Seq( ("hi", 1), ("there", 2), ("the", 3), ("end", 4) )).toDF("a", "b") val myNumbers = Set(1,2,3) val tmpUDF = udf { (n: Int) => myNumbers.contains(n) } val rowHasMyNumber = tmpUDF($"b") df.where(rowHasMyNumber).show()
Stack trace for Spark 2.0 (similar for other versions):
org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2057) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934) at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581) at org.apache.spark.sql.Dataset.head(Dataset.scala:1934) at org.apache.spark.sql.Dataset.take(Dataset.scala:2149) at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) at org.apache.spark.sql.Dataset.show(Dataset.scala:526) at org.apache.spark.sql.Dataset.show(Dataset.scala:486) at org.apache.spark.sql.Dataset.show(Dataset.scala:495) at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45) at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57) at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59) at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61) at linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7) at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6) Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column Serialization stack: - object not serializable (class: org.apache.spark.sql.Column, value: UDF(b)) - field (class: linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw, name: rowHasMyNumber, type: class org.apache.spark.sql.Column) - object (class linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw, linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw@6688375a) - field (class: linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1, name: $outer, type: class linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw) - object (class linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1, <function1>) - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>) - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(input[1, int, false])) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 2) - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;) - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2057) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934) at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581) at org.apache.spark.sql.Dataset.head(Dataset.scala:1934) at org.apache.spark.sql.Dataset.take(Dataset.scala:2149) at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) at org.apache.spark.sql.Dataset.show(Dataset.scala:526) at org.apache.spark.sql.Dataset.show(Dataset.scala:486) at org.apache.spark.sql.Dataset.show(Dataset.scala:495) at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45) at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57) at linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59) at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61) at linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7) at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6)