Details
-
Sub-task
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.0.2, 3.1.1
-
None
-
None
Description
For user defined type with TimestampType, conversion from pandas to pyarrow and from pyarrow to Spark UnsafeRow needs to be carefully handled.
From pandas to pyarrow
if isinstance(dt, UserDefinedType):
s = s.apply(dt.serialize)
t = to_arrow_type(dt)
array = pa.StructArray.from_pandas(s, mask=mask, type=t, safe=self._safecheck)
The above code may not work. If UDT is like:
class ExamplePointWithTimeUDT(UserDefinedType): """ User-defined type (UDT) for ExamplePointWithTime. """ @classmethod def sqlType(self): return StructType([ StructField("x", DoubleType(), False), StructField("y", DoubleType(), True), StructField("ts", TimestampType(), False), ]) @classmethod def module(cls): return 'pyspark.sql.tests' @classmethod def scalaUDT(cls): return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT' def serialize(self, obj): return [obj.x, obj.y, obj.ts] def deserialize(self, datum): return ExamplePointWithTime(datum[0], datum[1], datum[2]) class ExamplePointWithTime: """ An example class to demonstrate UDT in Scala, Java, and Python. """ __UDT__ = ExamplePointWithTimeUDT() def __init__(self, x, y, ts): self.x = x self.y = y self.ts = ts def __repr__(self): return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts) def __str__(self): return "(%s,%s,%s)" % (self.x, self.y, self.ts) def __eq__(self, other): return isinstance(other, self.__class__) \ and other.x == self.x and other.y == self.y \ and other.ts == self.ts
From pyarrow to Spark Internal
Serialize and deserialize will fail.
See the failed PR demo: https://github.com/eddyxu/spark/pull/4