Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26147

Python UDFs in join condition fail even when using columns from only one side of join

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.4.0
    • 2.4.1, 3.0.0
    • PySpark
    • None

    Description

      The rule PullOutPythonUDFInJoinCondition was implemented in https://github.com/apache/spark/commit/2a8cbfddba2a59d144b32910c68c22d0199093fe
      As far as I understand, this rule was intended to prevent the use of Python UDFs in join condition if they take arguments from both sides of the join, and this doesn't make sense in combination with the join type.

      The rule PullOutPythonUDFInJoinCondition seems to make an assumption, that if a given UDF is only using columns from a single side of the join, it will be already pushed down under the join before this rule is executed.

      However, this is not always the case. Here's a simple example that fails, even though it looks like it should run just fine (and it does in earlier versions of Spark):

      from pyspark.sql import Row
      from pyspark.sql.types import StringType
      from pyspark.sql.functions import udf
      
      cars_list = [ Row("NL", "1234AB"), Row("UK", "987654") ]
      insurance_list = [ Row("NL-1234AB"), Row("BE-112233") ]
      
      spark.createDataFrame(data = cars_list, schema = ["country", "plate_nr"]).createOrReplaceTempView("cars")
      spark.createDataFrame(data = insurance_list, schema = ["insurance_code"]).createOrReplaceTempView("insurance")
      
      to_insurance_code = udf(lambda x, y: x + "-" + y, StringType())	
      sqlContext.udf.register('to_insurance_code', to_insurance_code)
      
      spark.conf.set("spark.sql.crossJoin.enabled", "true")
      
      # This query runs just fine.
      sql("""
        SELECT country, plate_nr, insurance_code
        FROM cars LEFT OUTER JOIN insurance
        ON CONCAT(country, '-', plate_nr) = insurance_code
      """).show()
      
      # This equivalent query fails with:
      # pyspark.sql.utils.AnalysisException: u'Using PythonUDF in join condition of join type LeftOuter is not supported.;'
      sql("""
        SELECT country, plate_nr, insurance_code
        FROM cars LEFT OUTER JOIN insurance
        ON to_insurance_code(country, plate_nr) = insurance_code
      """).show()
      

      cloud_fan XuanYuan fyi

      Attachments

        Activity

          People

            cloud_fan Wenchen Fan
            ala.luszczak Ala Luszczak
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: