Description
There's a latent corner-case bug in PYSpark UDF evaluation where executing a stage with a single UDF that takes more than one argument where that argument is repeated will crash at execution with a confusing error.
Here's a repro:
from pyspark.sql.types import * spark.catalog.registerFunction("add", lambda x, y: x + y, IntegerType()) spark.sql("SELECT add(1, 1)").first()
This fails with
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/Users/joshrosen/Documents/spark/python/lib/pyspark.zip/pyspark/worker.py", line 180, in main process() File "/Users/joshrosen/Documents/spark/python/lib/pyspark.zip/pyspark/worker.py", line 175, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/Users/joshrosen/Documents/spark/python/lib/pyspark.zip/pyspark/worker.py", line 107, in <lambda> func = lambda _, it: map(mapper, it) File "/Users/joshrosen/Documents/spark/python/lib/pyspark.zip/pyspark/worker.py", line 93, in <lambda> mapper = lambda a: udf(*a) File "/Users/joshrosen/Documents/spark/python/lib/pyspark.zip/pyspark/worker.py", line 71, in <lambda> return lambda *a: f(*a) TypeError: <lambda>() takes exactly 2 arguments (1 given)
The problem was introduced by SPARK-14267: there code there has a fast path for handling a "batch UDF evaluation consisting of a single Python UDF, but that branch incorrectly assumes that a single UDF won't have repeated arguments and therefore skips the code for unpacking arguments from the input row (whose schema may not necessarily match the UDF inputs).
I have a simple fix for this which I'll submit now.
Attachments
Issue Links
- relates to
-
SPARK-14267 Execute multiple Python UDFs in single batch
-
- Resolved
-
- links to