Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
4.0.0
Description
We should avoid pushing down Unevaluable expression as it can cause unexpected failures. For example, the code snippet below (assuming there is a table t with a partition column p)
from pyspark import SparkConf from pyspark.sql import SparkSession from pyspark.sql.types import StringType import pyspark.sql.functions as f def getdata(p: str) -> str: return "data" NEW_COLUMN = 'new_column' P_COLUMN = 'p' f_getdata = f.udf(getdata, StringType()) rows = spark.sql("select * from default.t") table = rows.withColumn(NEW_COLUMN, f_getdata(f.col(P_COLUMN))) df = table.alias('t1').join(table.alias('t2'), (f.col(f"t1.{NEW_COLUMN}") == f.col(f"t2.{NEW_COLUMN}")), how='inner') df.show()
will cause an error like:
org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: getdata(input[0, string, true])#16
at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:66)
at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:391)
at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:390)
at org.apache.spark.sql.catalyst.expressions.PythonUDF.eval(PythonUDF.scala:71)
at org.apache.spark.sql.catalyst.expressions.IsNotNull.eval(nullExpressions.scala:384)
at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:52)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1(ExternalCatalogUtils.scala:166)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1$adapted(ExternalCatalogUtils.scala:165)