Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27331

Schema mismatch using MicroBatchReader with columns pruning

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Invalid
    • 2.3.1
    • None
    • Spark Core
    • spark 2.3.1

    Description

      I'm writing a custom Spark streaming source. I want to support columns pruning, I did something like this:

       

      class MyMicroBatchReader(...) extends MicroBatchReader with SupportsPushDownRequiredColumns {
      
        var schema: StructType = createSchema()
      
        def readSchema(): StructType = schema
      
        def pruneColumns(requiredSchema: StructType): Unit = {
          schema = requiredSchema
        }
      
        ...
      
      }
      

       

      if I run a streaming query selecting some columns, the job fails. For example, running:

      spark.readStream().format("mysource").load().select("Id").writeStream().format("console").start()
      

      I obtain the following exception (in the second iteration):

      18/06/29 15:50:01 ERROR MicroBatchExecution: Query [id = 59c13195-9d63-42c9-8f92-eb9d67e8b26c, runId = 72124019-1ab3-48a9-9503-0cf1c7d26fb9] terminated with error java.lang.AssertionError: assertion failed: Invalid batch: fieldA#0,fieldB#1,fieldC,Id#3,fieldD#4,fieldE#5 != Id#52 at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2$$anonfun$applyOrElse$4.apply(MicroBatchExecution.scala:417) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2$$anonfun$applyOrElse$4.apply(MicroBatchExecution.scala:416) at scala.Option.map(Option.scala:146) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2.applyOrElse(MicroBatchExecution.scala:416) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$2.applyOrElse(MicroBatchExecution.scala:414) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:414) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
      

      Can you please help?

      Attachments

        Activity

          People

            Unassigned Unassigned
            Raviv Kineret
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: