Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-6411

PySpark DataFrames can't be created if any datetimes have timezones

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.3.0
    • 1.5.0
    • PySpark, SQL
    • None

    Description

      I am unable to create a DataFrame with PySpark if any of the datetime objects that pass through the conversion process have a tzinfo property set.

      This works fine:

      In [9]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11, 10),)]).toDF().collect()
      Out[9]: [Row(_1=datetime.datetime(2014, 7, 8, 11, 10))]
      

      as expected, the tuple's schema is inferred as having one anonymous column with a datetime field, and the datetime roundtrips through to the Java side python deserialization and then back into python land upon collect. This however:

      In [5]: from dateutil.tz import tzutc
      
      In [10]: sc.parallelize([(datetime.datetime(2014, 7, 8, 11, 10, tzinfo=tzutc()),)]).toDF().collect()
      

      explodes with

      Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
      : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 12, localhost): net.razorvine.pickle.PickleException: invalid pickle data for datetime; expected 1 or 7 args, got 2
      	at net.razorvine.pickle.objects.DateTimeConstructor.createDateTime(DateTimeConstructor.java:69)
      	at net.razorvine.pickle.objects.DateTimeConstructor.construct(DateTimeConstructor.java:32)
      	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617)
      	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170)
      	at net.razorvine.pickle.Unpickler.load(Unpickler.java:84)
      	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97)
      	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:154)
      	at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:153)
      	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
      	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:119)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:114)
      	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
      	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
      	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
      	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
      	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.to(SerDeUtil.scala:114)
      	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
      	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toBuffer(SerDeUtil.scala:114)
      	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
      	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.toArray(SerDeUtil.scala:114)
      	at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
      	at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
      	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520)
      	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1520)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
      	at org.apache.spark.scheduler.Task.run(Task.scala:64)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:744)
      
      Driver stacktrace:
      	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1211)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1200)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1199)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1199)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
      	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
      	at scala.Option.foreach(Option.scala:236)
      	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1401)
      	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
      	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      

      By the looks of the error, it would appear as though the java depickler isn't expecting the pickle stream to provide that extra timezone constructor argument.

      Here's the disassembled pickle stream for a timezone-less datetime:

      >>> object = datetime.datetime(2014, 7, 8, 11, 10)
      >>> stream = pickle.dumps(object)
      >>> pickletools.dis(stream)
          0: c    GLOBAL     'datetime datetime'
         19: p    PUT        0
         22: (    MARK
         23: S        STRING     '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00'
         65: p        PUT        1
         68: t        TUPLE      (MARK at 22)
         69: p    PUT        2
         72: R    REDUCE
         73: p    PUT        3
         76: .    STOP
      highest protocol among opcodes = 0
      

      and then for one with a timezone:

      >>> object = datetime.datetime(2014, 7, 8, 11, 10, tzinfo=tzutc())
      >>> stream = pickle.dumps(object)
      >>> pickletools.dis(stream)
          0: c    GLOBAL     'datetime datetime'
         19: p    PUT        0
         22: (    MARK
         23: S        STRING     '\x07\xde\x07\x08\x0b\n\x00\x00\x00\x00'
         65: p        PUT        1
         68: c        GLOBAL     'copy_reg _reconstructor'
         93: p        PUT        2
         96: (        MARK
         97: c            GLOBAL     'dateutil.tz tzutc'
        116: p            PUT        3
        119: c            GLOBAL     'datetime tzinfo'
        136: p            PUT        4
        139: g            GET        4
        142: (            MARK
        143: t                TUPLE      (MARK at 142)
        144: R            REDUCE
        145: p            PUT        5
        148: t            TUPLE      (MARK at 96)
        149: p        PUT        6
        152: R        REDUCE
        153: p        PUT        7
        156: t        TUPLE      (MARK at 22)
        157: p    PUT        8
        160: R    REDUCE
        161: p    PUT        9
        164: .    STOP
      highest protocol among opcodes = 0
      

      I would bet that the Pyrolite library is missing support for that nested object as a second tuple member in the reconstruction of the datetime object. Has anyone hit this before? Any more information I can provide?

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            davies Davies Liu
            airhorns Harry Brundage
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment