Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
2.3.0
-
None
-
None
-
Spark 2.3.0
Python 3.5
Java 1.8.0_161-b12
Description
Hi,
I have a code that works on DataBricks but doesn't work on a local spark installation.
This is the code I'm running:
from pyspark.sql.functions import pandas_udf import pandas as pd import numpy as np from pyspark.sql.types import * schema = StructType([ StructField("Distance", FloatType()), StructField("CarId", IntegerType()) ]) def haversine(lon1, lat1, lon2, lat2): #Calculate distance, return scalar return 3.5 # Removed logic to facilitate reading @pandas_udf(schema) def totalDistance(oneCar): dist = haversine(oneCar.Longtitude.shift(1), oneCar.Latitude.shift(1), oneCar.loc[1:, 'Longitude'], oneCar.loc[1:, 'Latitude']) return pd.DataFrame({"CarId":oneCar['CarId'].iloc[0],"Distance":np.sum(dist)},index = [0]) ## Calculate the overall distance made by each car distancePerCar= df.groupBy('CarId').apply(totalDistance)
I'm getting this exception, about Arrow not able to deal with this input:
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self) 114 try: --> 115 to_arrow_type(self._returnType_placeholder) 116 except TypeError: C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\types.py in to_arrow_type(dt) 1641 else: -> 1642 raise TypeError("Unsupported type in conversion to Arrow: " + str(dt)) 1643 return arrow_type TypeError: Unsupported type in conversion to Arrow: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true))) During handling of the above exception, another exception occurred: NotImplementedError Traceback (most recent call last) <ipython-input-35-4f2194cfb998> in <module>() 18 km = 6367 * c 19 return km ---> 20 @pandas_udf("CarId: int, Distance: float") 21 def totalDistance(oneUser): 22 dist = haversine(oneUser.Longtitude.shift(1), oneUser.Latitude.shift(1), C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _create_udf(f, returnType, evalType) 62 udf_obj = UserDefinedFunction( 63 f, returnType=returnType, name=None, evalType=evalType, deterministic=True) ---> 64 return udf_obj._wrapped() 65 66 C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _wrapped(self) 184 185 wrapper.func = self.func --> 186 wrapper.returnType = self.returnType 187 wrapper.evalType = self.evalType 188 wrapper.deterministic = self.deterministic C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self) 117 raise NotImplementedError( 118 "Invalid returnType with scalar Pandas UDFs: %s is " --> 119 "not supported" % str(self._returnType_placeholder)) 120 elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF: 121 if isinstance(self._returnType_placeholder, StructType): NotImplementedError: Invalid returnType with scalar Pandas UDFs: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true))) is not supported
I've also tried changing the schema to
@pandas_udf("<CarId:int,Distance:float>")
and
@pandas_udf("CarId:int,Distance:float")
As mentioned, this is working on a DataBricks instance in Azure, but not locally.