Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23645

pandas_udf can not be called with keyword arguments

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.3.0
    • 2.3.1, 2.4.0
    • PySpark
    • None
    • python 3.6 | pyspark 2.3.0 | Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_141

    Description

      pandas_udf (all python udfs) do not accept keyword arguments because `pyspark/sql/udf.py` class `UserDefinedFunction` has _call_, and also wrapper utility methods, that only accept args and not kwargs:

      @ line 168:

      ...
      
      def __call__(self, *cols):
          judf = self._judf
          sc = SparkContext._active_spark_context
          return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
      
      # This function is for improving the online help system in the interactive interpreter.
      # For example, the built-in help / pydoc.help. It wraps the UDF with the docstring and
      # argument annotation. (See: SPARK-19161)
      def _wrapped(self):
          """
          Wrap this udf with a function and attach docstring from func
          """
      
          # It is possible for a callable instance without __name__ attribute or/and
          # __module__ attribute to be wrapped here. For example, functools.partial. In this case,
          # we should avoid wrapping the attributes from the wrapped function to the wrapper
          # function. So, we take out these attribute names from the default names to set and
          # then manually assign it after being wrapped.
          assignments = tuple(
              a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__')
      
          @functools.wraps(self.func, assigned=assignments)
          def wrapper(*args):
              return self(*args)
      
      ...

      as seen in:

      from pyspark.sql import SparkSession
      
      from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit
      
      spark = SparkSession.builder.getOrCreate()
      
      df = spark.range(12).withColumn('b', col('id') * 2)
      
      def ok(a,b): return a*b
      
      df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')('id','b')).show()  # no problems
      df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')(a='id',b='b')).show()  # fail with ~no stacktrace thanks to wrapper helper
      
      ---------------------------------------------------------------------------
      TypeError Traceback (most recent call last)
      <ipython-input-2-8ba6c4344dc7> in <module>()
      ----> 1 df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')(a='id',b='b')).show()
      
      TypeError: wrapper() got an unexpected keyword argument 'a'

       

       

      discourse: it isn't difficult to swap back in the kwargs, allowing the UDF to be called as such, but the cols tuple that gets passed in the call method:

      _to_seq(sc, cols, _to_java_column

       has to be in the right order based on the functions defined argument inputs, or the function will return incorrect results. so, the challenge here is to:

      (a) make sure to reconstruct the proper order of the full args/kwargs

      --> args first, and then kwargs (not in the order passed but in the order requested by the fn)

      (b) handle python2 and python3 `inspect` module inconsistencies 

      Attachments

        Activity

          People

            mstewart141 Stu (Michael Stewart)
            mstewart141 Stu (Michael Stewart)
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: