Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26278

V2 Streaming sources cannot be written to V1 sinks

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.3.2
    • Fix Version/s: None
    • Labels:
      None

      Description

      Starting from a streaming DataFrame derived from a custom v2 MicroBatch reader, we have

      val df: DataFrame = ... 
      assert(df.isStreaming)
      
      val outputFormat = "orc" // also applies to "csv" and "json" but not "console" 
      
      df.writeStream
        .format(outputFormat)
        .option("checkpointLocation", "/tmp/checkpoints")
        .option("path", "/tmp/result")
        .start
      

      This code fails with the following stack trace:

      2018-12-04 08:24:27 ERROR MicroBatchExecution:91 - Query [id = 193f97bf-8064-4658-8aa6-0f481919eafe, runId = e96ed7e5-aaf4-4ef4-a3f3-05fe0b01a715] terminated with error
      java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
          at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
          at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
          at scala.collection.Iterator$class.foreach(Iterator.scala:893)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
          at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
          at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
          at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
          at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
          at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
          at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
          at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
          at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
          at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
          at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
          at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

      I'm filing this issue on the suggestion of Seth Fitzsimmons who suggests that this problem could be resolved by backporting streaming sinks from spark 2.4.0.

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              jpolchlo Justin Polchlopek

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment