Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-35096

foreachBatch throws ArrayIndexOutOfBoundsException if schema is case Insensitive

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.0.3, 3.1.2, 3.2.0
    • Spark Core
    • None

    Description

      Below code works fine before spark3, running on spark3 throws 

      java.lang.ArrayIndexOutOfBoundsException

      val inputPath = "/Users/xyz/data/testcaseInsensitivity"
      val output_path = "/Users/xyz/output"
      
      spark.range(10).write.format("parquet").save(inputPath)
      
      def process_row(microBatch: DataFrame, batchId: Long): Unit = {
        val df = microBatch.select($"ID".alias("other")) // Doesn't work
        df.write.format("parquet").mode("append").save(output_path)
      
      }
      
      val schema = new StructType().add("id", LongType)
      
      val stream_df = spark.readStream.schema(schema).format("parquet").load(inputPath)
      stream_df.writeStream.trigger(Trigger.Once).foreachBatch(process_row _)
        .start().awaitTermination()
      

      Stack Trace:

      Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
        at org.apache.spark.sql.types.StructType.apply(StructType.scala:414)
        at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.$anonfun$applyOrElse$3(objects.scala:216)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.applyOrElse(objects.scala:215)
        at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.applyOrElse(objects.scala:203)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298)
        at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:203)
        at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:121)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
        at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
        at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
        at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:82)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:82)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$4(QueryExecution.scala:197)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:197)
        at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:95)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
        at process_row(<console>:32)
        at $anonfun$res4$1(<console>:30)
        at $anonfun$res4$1$adapted(<console>:30)
        at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:36)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:573)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:571)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:571)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
        at org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
        ... 1 more
      

      Attachments

        Activity

          People

            sandeep.katta2007 Sandeep Katta
            sandeep.katta2007 Sandeep Katta
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: