Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.0.1
-
None
-
Python 3.5, Java 8
Description
Dunno if I'm misinterpreting something here, but this seems like a bug in how UDFs work, or in how they interface with the optimizer.
Here's a basic reproduction. I'm using length_udf() just for illustration; it could be any UDF that accesses fields that have been aliased.
import pyspark from pyspark.sql import Row from pyspark.sql.functions import udf, col, struct def length(full_name): # The non-aliased names, FIRST and LAST, show up here, instead of # first_name and last_name. # print(full_name) return len(full_name.first_name) + len(full_name.last_name) if __name__ == '__main__': spark = ( pyspark.sql.SparkSession.builder .getOrCreate()) length_udf = udf(length) names = spark.createDataFrame([ Row(FIRST='Nick', LAST='Chammas'), Row(FIRST='Walter', LAST='Williams'), ]) names_cleaned = ( names .select( col('FIRST').alias('first_name'), col('LAST').alias('last_name'), ) .withColumn('full_name', struct('first_name', 'last_name')) .select('full_name')) # We see the schema we expect here. names_cleaned.printSchema() # However, here we get an AttributeError. length_udf() cannot # find first_name or last_name. (names_cleaned .withColumn('length', length_udf('full_name')) .show())
When I run this I get a long stack trace, but the relevant portions seem to be:
File ".../udf-alias.py", line 10, in length return len(full_name.first_name) + len(full_name.last_name) File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/sql/types.py", line 1502, in __getattr__ raise AttributeError(item) AttributeError: first_name
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/local/Cellar/apache-spark/2.0.1/libexec/python/lib/pyspark.zip/pyspark/sql/types.py", line 1497, in __getattr__ idx = self.__fields__.index(item) ValueError: 'first_name' is not in list
Here are the relevant execution plans:
names_cleaned.explain() == Physical Plan == *Project [struct(FIRST#0 AS first_name#5, LAST#1 AS last_name#6) AS full_name#10] +- Scan ExistingRDD[FIRST#0,LAST#1]
(names_cleaned .withColumn('length', length_udf('full_name')) .explain()) == Physical Plan == *Project [struct(FIRST#0 AS first_name#5, LAST#1 AS last_name#6) AS full_name#10, pythonUDF0#21 AS length#17] +- BatchEvalPython [length(struct(FIRST#0, LAST#1))], [FIRST#0, LAST#1, pythonUDF0#21] +- Scan ExistingRDD[FIRST#0,LAST#1]
It looks like from the second execution plan that BatchEvalPython somehow gets the unaliased column names, whereas the Project right above it gets the aliased names.
Attachments
Issue Links
- is related to
-
SPARK-18589 persist() resolves "java.lang.RuntimeException: Invalid PythonUDF <lambda>(...), requires attributes from more than one child"
- Resolved