Description
I am unable to create a DataFrame with PySpark if any of the datetime objects that pass through the conversion process have a tzinfo property set.
This works fine:
In [9]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11, 10),)]).toDF().collect() Out[9]: [Row(_1=datetime.datetime(2014, 7, 8, 11, 10))]
as expected, the tuple's schema is inferred as having one anonymous column with a datetime field, and the datetime roundtrips through to the Java side python deserialization and then back into python land upon collect. This however:
In [5]: from dateutil.tz import tzutc
In [10]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11, 10, tzinfo=tzutc()),)]).toDF().collect()
explodes with
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 12, localhost): net.razorvine.pickle.PickleException: invalid pickle data for datetime; expected 1 or 7 args, got 2 at net.razorvine.pickle.objects.DateTimeConstructor.createDateTime(DateTimeConstructor.java:69) at net.razorvine.pickle.objects.DateTimeConstructor.construct(DateTimeConstructor.java:32) at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617) at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170) at net.razorvine.pickle.Unpickler.load(Unpickler.java:84) at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97) at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:154) at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:153) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:119) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:114) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:114) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:114) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:114) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1211) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1200) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1199) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1199) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1401) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
By the looks of the error, it would appear as though the java depickler isn't expecting the pickle stream to provide that extra timezone constructor argument.
Here's the disassembled pickle stream for a timezone-less datetime:
>>> object = datetime.datetime(2014, 7, 8, 11, 10) >>> stream = pickle.dumps(object) >>> pickletools.dis(stream) 0: c GLOBAL 'datetime datetime' 19: p PUT 0 22: ( MARK 23: S STRING '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00' 65: p PUT 1 68: t TUPLE (MARK at 22) 69: p PUT 2 72: R REDUCE 73: p PUT 3 76: . STOP highest protocol among opcodes = 0
and then for one with a timezone:
>>> object = datetime.datetime(2014, 7, 8, 11, 10, tzinfo=tzutc()) >>> stream = pickle.dumps(object) >>> pickletools.dis(stream) 0: c GLOBAL 'datetime datetime' 19: p PUT 0 22: ( MARK 23: S STRING '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00' 65: p PUT 1 68: c GLOBAL 'copy_reg _reconstructor' 93: p PUT 2 96: ( MARK 97: c GLOBAL 'dateutil.tz tzutc' 116: p PUT 3 119: c GLOBAL 'datetime tzinfo' 136: p PUT 4 139: g GET 4 142: ( MARK 143: t TUPLE (MARK at 142) 144: R REDUCE 145: p PUT 5 148: t TUPLE (MARK at 96) 149: p PUT 6 152: R REDUCE 153: p PUT 7 156: t TUPLE (MARK at 22) 157: p PUT 8 160: R REDUCE 161: p PUT 9 164: . STOP highest protocol among opcodes = 0
I would bet that the Pyrolite library is missing support for that nested object as a second tuple member in the reconstruction of the datetime object. Has anyone hit this before? Any more information I can provide?
Attachments
Attachments
Issue Links
- is related to
-
SPARK-7314 Upgrade Pyrolite to 4.4
- Resolved
- links to