Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17100

pyspark filter on a udf column after join gives java.lang.UnsupportedOperationException

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.0.0
    • Fix Version/s: 2.0.1, 2.1.0
    • Component/s: PySpark
    • Labels:
      None
    • Environment:

      spark-2.0.0-bin-hadoop2.7. Python2 and Python3.

      Description

      In pyspark, when filtering on a udf derived column after some join types,
      the optimized logical plan results is a java.lang.UnsupportedOperationException.

      I could not replicate this in scala code from the shell, just python. It is a pyspark regression from spark 1.6.2.

      This can be replicated with: bin/spark-submit bug.py

      import pyspark.sql.functions as F
      from pyspark.sql import Row, SparkSession
      
      if __name__ == '__main__':
          spark = SparkSession.builder.appName("test").getOrCreate()
          left = spark.createDataFrame([Row(a=1)])
          right = spark.createDataFrame([Row(a=1)])
          df = left.join(right, on='a', how='left_outer')
          df = df.withColumn('b', F.udf(lambda x: 'x')(df.a))
          df = df.filter('b = "x"')
          df.explain(extended=True)
      

      The output is:

      == Parsed Logical Plan ==
      'Filter ('b = x)
      +- Project [a#0L, <lambda>(a#0L) AS b#8]
         +- Project [a#0L]
            +- Join LeftOuter, (a#0L = a#3L)
               :- LogicalRDD [a#0L]
               +- LogicalRDD [a#3L]
      
      == Analyzed Logical Plan ==
      a: bigint, b: string
      Filter (b#8 = x)
      +- Project [a#0L, <lambda>(a#0L) AS b#8]
         +- Project [a#0L]
            +- Join LeftOuter, (a#0L = a#3L)
               :- LogicalRDD [a#0L]
               +- LogicalRDD [a#3L]
      
      == Optimized Logical Plan ==
      java.lang.UnsupportedOperationException: Cannot evaluate expression: <lambda>(input[0, bigint, true])
      == Physical Plan ==
      java.lang.UnsupportedOperationException: Cannot evaluate expression: <lambda>(input[0, bigint, true])
      

      It fails when the join is:

      • how='outer', on=column expression
      • how='left_outer', on=string or column expression
      • how='right_outer', on=string or column expression

      It passes when the join is:

      • how='inner', on=string or column expression
      • how='outer', on=string

      I made some tests to demonstrate each of these.

      Run with bin/spark-submit test_bug.py

        Attachments

        1. bug.py
          0.4 kB
          Tim Sell
        2. test_bug.py
          3 kB
          Tim Sell

          Issue Links

            Activity

              People

              • Assignee:
                davies Davies Liu
                Reporter:
                tim_s Tim Sell
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: