Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25060

PySpark UDF in case statement is always run

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Won't Fix
    • 2.3.1
    • None
    • PySpark
    • None

    Description

      When evaluating a case statement with a python UDF, Spark will always run the UDF even if the case doesn't use the branch with the UDF call. Here's a repro case:

      from pyspark.sql.types import StringType
      
      def fail_if_x(s):
          assert s != 'x'
          return s
      
      spark.udf.register("fail_if_x", fail_if_x, StringType())
      
      df = spark.createDataFrame([(1, 'x'), (2, 'y')], ['id', 'str'])
      
      df.registerTempTable("data")
      
      spark.sql("select id, case when str <> 'x' then fail_if_x(str) else null end from data").show()
      

      This produces the following error:

      Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
        File "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/pyspark.zip/pyspark/worker.py", line 189, in main 
          process() 
        File "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/pyspark.zip/pyspark/worker.py", line 184, in process 
          serializer.dump_stream(func(split_index, iterator), outfile) 
        File "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/pyspark.zip/pyspark/worker.py", line 104, in <lambda> 
          func = lambda _, it: map(mapper, it) 
        File "<string>", line 1, in <lambda> 
        File "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/pyspark.zip/pyspark/worker.py", line 71, in <lambda> 
          return lambda *a: f(*a) 
        File "<ipython-input-1-91ba29d7e46f>", line 4, in fail_if_x 
      AssertionError
      

      This is because Python UDFs are extracted from expressions and run in the BatchEvalPython node inserted as the child of the expression node:

      == Physical Plan ==
      CollectLimit 21
      +- *Project [id#0L, CASE WHEN NOT (str#1 = x) THEN pythonUDF0#14 ELSE null END AS CASE WHEN (NOT (str = x)) THEN fail_if_x(str) ELSE CAST(NULL AS STRING) END#6]
         +- BatchEvalPython [fail_if_x(str#1)], [id#0L, str#1, pythonUDF0#14]
            +- Scan ExistingRDD[id#0L,str#1]
      

      This doesn't affect correctness, but the behavior doesn't match the Scala API where case can be used to avoid passing data that will cause a UDF to fail into the UDF.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              rdblue Ryan Blue
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: