Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26278

V2 Streaming sources cannot be written to V1 sinks

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.2
    • None
    • None

    Description

      Starting from a streaming DataFrame derived from a custom v2 MicroBatch reader, we have

      val df: DataFrame = ... 
      assert(df.isStreaming)
      
      val outputFormat = "orc" // also applies to "csv" and "json" but not "console" 
      
      df.writeStream
        .format(outputFormat)
        .option("checkpointLocation", "/tmp/checkpoints")
        .option("path", "/tmp/result")
        .start
      

      This code fails with the following stack trace:

      2018-12-04 08:24:27 ERROR MicroBatchExecution:91 - Query [id = 193f97bf-8064-4658-8aa6-0f481919eafe, runId = e96ed7e5-aaf4-4ef4-a3f3-05fe0b01a715] terminated with error
      java.lang.ClassCastException: org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to org.apache.spark.sql.sources.v2.reader.streaming.Offset
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:405)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
          at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
          at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
          at scala.collection.Iterator$class.foreach(Iterator.scala:893)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
          at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
          at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
          at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
          at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
          at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
          at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
          at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
          at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
          at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
          at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
          at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
          at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

      I'm filing this issue on the suggestion of mojodna who suggests that this problem could be resolved by backporting streaming sinks from spark 2.4.0.

      Attachments

        Activity

          People

            Unassigned Unassigned
            jpolchlo Justin Polchlopek
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: