Description
Here is the code to reproduce in local mode
scala> val df = sc.range(1, 2).toDF df: org.apache.spark.sql.DataFrame = [value: bigint] scala> val myudf = udf({x: Long => println("xxxx"); x + 1}) myudf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(LongType))) scala> val df1 = df.withColumn("value1", myudf(col("value"))) df1: org.apache.spark.sql.DataFrame = [value: bigint, value1: bigint] scala> df1.cache res0: df1.type = [value: bigint, value1: bigint] scala> df1.count res1: Long = 1 scala> df1.count res2: Long = 1 scala> df1.count res3: Long = 1
in Spark 2.2, you could see it prints "xxxx".
In the above example, when you do explain. You could see
scala> df1.explain(true) == Parsed Logical Plan == 'Project [value#2L, UDF('value) AS value1#5] +- AnalysisBarrier +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == value: bigint, value1: bigint Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L] == Physical Plan == *(1) InMemoryTableScan [value#2L, value1#5L] +- InMemoryRelation [value#2L, value1#5L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [value#2L, UDF(value#2L) AS value1#5L] +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L]
but the ImMemoryTableScan is mising in the following explain()
scala> df1.groupBy().count().explain(true) == Parsed Logical Plan == Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Analyzed Logical Plan == count: bigint Aggregate [count(1) AS count#170L] +- Project [value#2L, if (isnull(value#2L)) null else if (isnull(value#2L)) null else UDF(value#2L) AS value1#5L] +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Optimized Logical Plan == Aggregate [count(1) AS count#170L] +- Project +- SerializeFromObject [input[0, bigint, false] AS value#2L] +- ExternalRDD [obj#1L] == Physical Plan == *(2) HashAggregate(keys=[], functions=[count(1)], output=[count#170L]) +- Exchange SinglePartition +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#175L]) +- *(1) Project +- *(1) SerializeFromObject [input[0, bigint, false] AS value#2L] +- Scan ExternalRDDScan[obj#1L]