Details
Description
pandas_udf (all python udfs) do not accept keyword arguments because `pyspark/sql/udf.py` class `UserDefinedFunction` has _call_, and also wrapper utility methods, that only accept args and not kwargs:
@ line 168:
... def __call__(self, *cols): judf = self._judf sc = SparkContext._active_spark_context return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) # This function is for improving the online help system in the interactive interpreter. # For example, the built-in help / pydoc.help. It wraps the UDF with the docstring and # argument annotation. (See: SPARK-19161) def _wrapped(self): """ Wrap this udf with a function and attach docstring from func """ # It is possible for a callable instance without __name__ attribute or/and # __module__ attribute to be wrapped here. For example, functools.partial. In this case, # we should avoid wrapping the attributes from the wrapped function to the wrapper # function. So, we take out these attribute names from the default names to set and # then manually assign it after being wrapped. assignments = tuple( a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__') @functools.wraps(self.func, assigned=assignments) def wrapper(*args): return self(*args) ...
as seen in:
from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit spark = SparkSession.builder.getOrCreate() df = spark.range(12).withColumn('b', col('id') * 2) def ok(a,b): return a*b df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')('id','b')).show() # no problems df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')(a='id',b='b')).show() # fail with ~no stacktrace thanks to wrapper helper --------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-2-8ba6c4344dc7> in <module>() ----> 1 df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')(a='id',b='b')).show() TypeError: wrapper() got an unexpected keyword argument 'a'
discourse: it isn't difficult to swap back in the kwargs, allowing the UDF to be called as such, but the cols tuple that gets passed in the call method:
_to_seq(sc, cols, _to_java_column
has to be in the right order based on the functions defined argument inputs, or the function will return incorrect results. so, the challenge here is to:
(a) make sure to reconstruct the proper order of the full args/kwargs
--> args first, and then kwargs (not in the order passed but in the order requested by the fn)
(b) handle python2 and python3 `inspect` module inconsistencies