Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25257

v2 MicroBatchReaders can't resume from checkpoints

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.3.1
    • None
    • Structured Streaming
    • None

    Description

      When resuming from a checkpoint:

      writeStream.option("checkpointLocation", "/tmp/checkpoint").format("console").start
      

      The stream reader fails with:

      osmesa.common.streaming.AugmentedDiffMicroBatchReader@59e19287
      	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
      	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
      Caused by: java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
      	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
      	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
      	at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
      	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
      	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
      	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
      	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
      	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
      	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
      	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
      	... 1 more
      

      The root cause appears to be that the SerializedOffset (JSON, from disk) is never deserialized; I would expect to see something along the lines of reader.deserializeOffset(off.json) here (unless available is intended to be deserialized elsewhere):

      https://github.com/apache/spark/blob/4dc82259d81102e0cb48f4cb2e8075f80d899ac4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L405

      Attachments

        1. deserialize.patch
          1 kB
          Seth Fitzsimmons

        Issue Links

          Activity

            People

              Unassigned Unassigned
              mojodna Seth Fitzsimmons
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: