Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-48666

A filter should not be pushed down if it contains Unevaluable expression

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      We should avoid pushing down Unevaluable expression as it can cause unexpected failures. For example, the code snippet below (assuming there is a table t with a partition column p)

      from pyspark import SparkConf
      from pyspark.sql import SparkSession
      from pyspark.sql.types import StringType
      
      import pyspark.sql.functions as f
      
      def getdata(p: str) -> str:
          return "data"
      
      NEW_COLUMN = 'new_column'
      P_COLUMN = 'p'
      
      f_getdata = f.udf(getdata, StringType())
      rows = spark.sql("select * from default.t")
      
      table = rows.withColumn(NEW_COLUMN, f_getdata(f.col(P_COLUMN)))
      
      df = table.alias('t1').join(table.alias('t2'), (f.col(f"t1.{NEW_COLUMN}") == f.col(f"t2.{NEW_COLUMN}")), how='inner')
      
      df.show()

      will cause an error like:

      org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate expression: getdata(input[0, string, true])#16
          at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
          at org.apache.spark.SparkException$.internalError(SparkException.scala:96)
          at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotEvaluateExpressionError(QueryExecutionErrors.scala:66)
          at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:391)
          at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:390)
          at org.apache.spark.sql.catalyst.expressions.PythonUDF.eval(PythonUDF.scala:71)
          at org.apache.spark.sql.catalyst.expressions.IsNotNull.eval(nullExpressions.scala:384)
          at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:52)
          at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1(ExternalCatalogUtils.scala:166)
          at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$1$adapted(ExternalCatalogUtils.scala:165) 

       
       

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            wzheng Wei Zheng
            wzheng Wei Zheng
            Votes:
            1 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment