Description
Some expression's data type not a static value. It needs to be calculated every time. For example:
spark.range(100000000L).selectExpr("approx_count_distinct(case when id % 400 > 20 then id else 0 end)").show
Profile result:
-- Execution profile --- Total samples : 18365 Frame buffer usage : 2.6688% --- 58443254327 ns (31.82%), 5844 samples [ 0] GenericTaskQueueSet<OverflowTaskQueue<StarTask, (MemoryType)1, 131072u>, (MemoryType)1>::steal_best_of_2(unsigned int, int*, StarTask&) [ 1] StealTask::do_it(GCTaskManager*, unsigned int) [ 2] GCTaskThread::run() [ 3] java_start(Thread*) [ 4] start_thread --- 6140668667 ns (3.34%), 614 samples [ 0] GenericTaskQueueSet<OverflowTaskQueue<StarTask, (MemoryType)1, 131072u>, (MemoryType)1>::peek() [ 1] ParallelTaskTerminator::offer_termination(TerminatorTerminator*) [ 2] StealTask::do_it(GCTaskManager*, unsigned int) [ 3] GCTaskThread::run() [ 4] java_start(Thread*) [ 5] start_thread --- 5679994036 ns (3.09%), 568 samples [ 0] scala.collection.generic.Growable.$plus$plus$eq [ 1] scala.collection.generic.Growable.$plus$plus$eq$ [ 2] scala.collection.mutable.ListBuffer.$plus$plus$eq [ 3] scala.collection.mutable.ListBuffer.$plus$plus$eq [ 4] scala.collection.generic.GenericTraversableTemplate.$anonfun$flatten$1 [ 5] scala.collection.generic.GenericTraversableTemplate$$Lambda$107.411506101.apply [ 6] scala.collection.immutable.List.foreach [ 7] scala.collection.generic.GenericTraversableTemplate.flatten [ 8] scala.collection.generic.GenericTraversableTemplate.flatten$ [ 9] scala.collection.AbstractTraversable.flatten [10] org.apache.spark.internal.config.ConfigEntry.readString [11] org.apache.spark.internal.config.ConfigEntryWithDefault.readFrom [12] org.apache.spark.sql.internal.SQLConf.getConf [13] org.apache.spark.sql.internal.SQLConf.caseSensitiveAnalysis [14] org.apache.spark.sql.types.DataType.sameType [15] org.apache.spark.sql.catalyst.analysis.TypeCoercion$.$anonfun$haveSameType$1 [16] org.apache.spark.sql.catalyst.analysis.TypeCoercion$.$anonfun$haveSameType$1$adapted [17] org.apache.spark.sql.catalyst.analysis.TypeCoercion$$$Lambda$1527.1975399904.apply [18] scala.collection.IndexedSeqOptimized.prefixLengthImpl [19] scala.collection.IndexedSeqOptimized.forall [20] scala.collection.IndexedSeqOptimized.forall$ [21] scala.collection.mutable.ArrayBuffer.forall [22] org.apache.spark.sql.catalyst.analysis.TypeCoercion$.haveSameType [23] org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataTypeCheck [24] org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataTypeCheck$ [25] org.apache.spark.sql.catalyst.expressions.CaseWhen.dataTypeCheck [26] org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataType [27] org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataType$ [28] org.apache.spark.sql.catalyst.expressions.CaseWhen.dataType [29] org.apache.spark.sql.catalyst.expressions.aggregate.HyperLogLogPlusPlus.update [30] org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1.$anonfun$applyOrElse$2 [31] org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1.$anonfun$applyOrElse$2$adapted [32] org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$Lambda$1534.1383512673.apply [33] org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7 [34] org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7$adapted [35] org.apache.spark.sql.execution.aggregate.AggregationIterator$$Lambda$1555.725788712.apply [36] org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs [37] org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init> [38] org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$2 [39] org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$2$adapted [40] org.apache.spark.sql.execution.aggregate.HashAggregateExec$$Lambda$1459.1481387816.apply [41] org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2 [42] org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted [43] org.apache.spark.rdd.RDD$$Lambda$683.57311983.apply [44] org.apache.spark.rdd.MapPartitionsRDD.compute [45] org.apache.spark.rdd.RDD.computeOrReadCheckpoint [46] org.apache.spark.rdd.RDD.iterator [47] org.apache.spark.rdd.MapPartitionsRDD.compute [48] org.apache.spark.rdd.RDD.computeOrReadCheckpoint [49] org.apache.spark.rdd.RDD.iterator [50] org.apache.spark.scheduler.ResultTask.runTask [51] org.apache.spark.scheduler.Task.run [52] org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3 [53] org.apache.spark.executor.Executor$TaskRunner$$Lambda$477.1129882178.apply [54] org.apache.spark.util.Utils$.tryWithSafeFinally [55] org.apache.spark.executor.Executor$TaskRunner.run [56] java.util.concurrent.ThreadPoolExecutor.runWorker [57] java.util.concurrent.ThreadPoolExecutor$Worker.run [58] java.lang.Thread.run