Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23883

Error with conversion to arrow while using pandas_udf

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 2.3.0
    • None
    • PySpark
    • None
    • Spark 2.3.0

      Python 3.5

      Java 1.8.0_161-b12

    Description

      Hi,

      I have a code that works on DataBricks but doesn't work on a local spark installation.

      This is the code I'm running:

      from pyspark.sql.functions import pandas_udf
      import pandas as pd
      import numpy as np
      from pyspark.sql.types import *
      
      schema = StructType([
        StructField("Distance", FloatType()),
        StructField("CarId", IntegerType())
      
      ])
      
      
      def haversine(lon1, lat1, lon2, lat2):
          #Calculate distance, return scalar
          return 3.5 # Removed logic to facilitate reading
      
      
      @pandas_udf(schema)
      def totalDistance(oneCar):
          dist = haversine(oneCar.Longtitude.shift(1),
                           oneCar.Latitude.shift(1),
                           oneCar.loc[1:, 'Longitude'], 
                           oneCar.loc[1:, 'Latitude'])
      
          return pd.DataFrame({"CarId":oneCar['CarId'].iloc[0],"Distance":np.sum(dist)},index = [0])
      
      
      ## Calculate the overall distance made by each car
      distancePerCar= df.groupBy('CarId').apply(totalDistance)
      

      I'm getting this exception, about Arrow not able to deal with this input:

      ---------------------------------------------------------------------------
      TypeError                                 Traceback (most recent call last)
      C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self)
          114             try:
      --> 115                 to_arrow_type(self._returnType_placeholder)
          116             except TypeError:
      
      C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\types.py in to_arrow_type(dt)
         1641     else:
      -> 1642         raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
         1643     return arrow_type
      
      TypeError: Unsupported type in conversion to Arrow: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true)))
      
      During handling of the above exception, another exception occurred:
      
      NotImplementedError                       Traceback (most recent call last)
      <ipython-input-35-4f2194cfb998> in <module>()
           18     km = 6367 * c
           19     return km
      ---> 20 @pandas_udf("CarId: int, Distance: float")
           21 def totalDistance(oneUser):
           22     dist = haversine(oneUser.Longtitude.shift(1), oneUser.Latitude.shift(1),
      
      C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _create_udf(f, returnType, evalType)
           62     udf_obj = UserDefinedFunction(
           63         f, returnType=returnType, name=None, evalType=evalType, deterministic=True)
      ---> 64     return udf_obj._wrapped()
           65 
           66 
      
      C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _wrapped(self)
          184 
          185         wrapper.func = self.func
      --> 186         wrapper.returnType = self.returnType
          187         wrapper.evalType = self.evalType
          188         wrapper.deterministic = self.deterministic
      
      C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self)
          117                 raise NotImplementedError(
          118                     "Invalid returnType with scalar Pandas UDFs: %s is "
      --> 119                     "not supported" % str(self._returnType_placeholder))
          120         elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
          121             if isinstance(self._returnType_placeholder, StructType):
      
      NotImplementedError: Invalid returnType with scalar Pandas UDFs: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true))) is not supported

      I've also tried changing the schema to

      @pandas_udf("<CarId:int,Distance:float>") 

      and

      @pandas_udf("CarId:int,Distance:float")

       

      As mentioned, this is working on a DataBricks instance in Azure, but not locally.

      Attachments

        Activity

          People

            Unassigned Unassigned
            omri374 Omri
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: