Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18254

UDFs don't see aliased column names

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.1
    • None
    • SQL
    • Python 3.5, Java 8

    Description

      Dunno if I'm misinterpreting something here, but this seems like a bug in how UDFs work, or in how they interface with the optimizer.

      Here's a basic reproduction. I'm using length_udf() just for illustration; it could be any UDF that accesses fields that have been aliased.

      import pyspark
      from pyspark.sql import Row
      from pyspark.sql.functions import udf, col, struct
      
      
      def length(full_name):
          # The non-aliased names, FIRST and LAST, show up here, instead of
          # first_name and last_name.
          # print(full_name)
          return len(full_name.first_name) + len(full_name.last_name)
      
      
      if __name__ == '__main__':
          spark = (
              pyspark.sql.SparkSession.builder
              .getOrCreate())
      
          length_udf = udf(length)
      
          names = spark.createDataFrame([
              Row(FIRST='Nick', LAST='Chammas'),
              Row(FIRST='Walter', LAST='Williams'),
          ])
      
          names_cleaned = (
              names
              .select(
                  col('FIRST').alias('first_name'),
                  col('LAST').alias('last_name'),
              )
              .withColumn('full_name', struct('first_name', 'last_name'))
              .select('full_name'))
      
          # We see the schema we expect here.
          names_cleaned.printSchema()
      
          # However, here we get an AttributeError. length_udf() cannot
          # find first_name or last_name.
          (names_cleaned
          .withColumn('length', length_udf('full_name'))
          .show())
      

      When I run this I get a long stack trace, but the relevant portions seem to be:

        File ".../udf-alias.py", line 10, in length
          return len(full_name.first_name) + len(full_name.last_name)
        File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/sql/types.py", line 1502, in __getattr__
          raise AttributeError(item)
      AttributeError: first_name
      
      Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
        File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/sql/types.py", line 1497, in __getattr__
          idx = self.__fields__.index(item)
      ValueError: 'first_name' is not in list
      

      Here are the relevant execution plans:

      names_cleaned.explain()
      
      == Physical Plan ==
      *Project [struct(FIRST#0 AS first_name#5, LAST#1 AS last_name#6) AS full_name#10]
      +- Scan ExistingRDD[FIRST#0,LAST#1]
      
      (names_cleaned
      .withColumn('length', length_udf('full_name'))
      .explain())
      
      == Physical Plan ==
      *Project [struct(FIRST#0 AS first_name#5, LAST#1 AS last_name#6) AS full_name#10, pythonUDF0#21 AS length#17]
      +- BatchEvalPython [length(struct(FIRST#0, LAST#1))], [FIRST#0, LAST#1, pythonUDF0#21]
         +- Scan ExistingRDD[FIRST#0,LAST#1]
      

      It looks like from the second execution plan that BatchEvalPython somehow gets the unaliased column names, whereas the Project right above it gets the aliased names.

      Attachments

        Issue Links

          Activity

            People

              eyalfa Eyal Farago
              nchammas Nicholas Chammas
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: