Description
In DPP,we trigger NPE,like blow:
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.execution.DataSourceScanExec.$init$(DataSourceScanExec.scala:57)
at org.apache.spark.sql.execution.FileSourceScanExec.<init>(DataSourceScanExec.scala:172)
...
at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:56)
at org.apache.spark.sql.catalyst.expressions.Predicate$.create(predicates.scala:101)
at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$2(basicPhysicalOperators.scala:246)
at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$2$adapted(basicPhysicalOperators.scala:245)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:885)
,the root cause is addExprTree funtion in EquivalentExpressions:
```
def addExprTree(
expr: Expression,
addFunc: Expression => Boolean = addExpr): Unit = {
val skip = expr.isInstanceOf[LeafExpression] ||
// `LambdaVariable` is usually used as a loop variable, which can't be evaluated ahead of the
// loop. So we can't evaluate sub-expressions containing `LambdaVariable` at the beginning.
expr.find(_.isInstanceOf[LambdaVariable]).isDefined ||
// `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor,
// can cause error like NPE.
(expr.isInstanceOf[PlanExpression[_]] && TaskContext.get != null)
if (!skip && !addFunc(expr)) {
childrenToRecurse(expr).foreach(addExprTree(_, addFunc))
commonChildrenToRecurse(expr).filter(.nonEmpty).foreach(addCommonExprs(, addFunc))
```
maybe we should change it like this :
```
(expr.find(_.isInstanceOf[PlanExpression[_]]).isDefined && TaskContext.get != null)
```
because, in DPP,the filter expression like this:
DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId)
so, we should iterator children, if PlanExpression found, such as InSubqueryExec, we should skip addExprTree, then NPE will not appears