Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.0.0
-
None
Description
Below code works fine before spark3, running on spark3 throwsÂ
java.lang.ArrayIndexOutOfBoundsException
val inputPath = "/Users/xyz/data/testcaseInsensitivity" val output_path = "/Users/xyz/output" spark.range(10).write.format("parquet").save(inputPath) def process_row(microBatch: DataFrame, batchId: Long): Unit = { val df = microBatch.select($"ID".alias("other")) // Doesn't work df.write.format("parquet").mode("append").save(output_path) } val schema = new StructType().add("id", LongType) val stream_df = spark.readStream.schema(schema).format("parquet").load(inputPath) stream_df.writeStream.trigger(Trigger.Once).foreachBatch(process_row _) .start().awaitTermination()
Stack Trace:
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.spark.sql.types.StructType.apply(StructType.scala:414) at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.$anonfun$applyOrElse$3(objects.scala:216) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.immutable.List.map(List.scala:298) at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.applyOrElse(objects.scala:215) at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$$anonfun$apply$4.applyOrElse(objects.scala:203) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:309) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:314) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298) at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:203) at org.apache.spark.sql.catalyst.optimizer.ObjectSerializerPruning$.apply(objects.scala:121) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149) at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:82) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:82) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:79) at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$4(QueryExecution.scala:197) at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381) at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:197) at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:95) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269) at process_row(<console>:32) at $anonfun$res4$1(<console>:30) at $anonfun$res4$1$adapted(<console>:30) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:36) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:573) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:571) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:571) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191) at org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334) ... 1 more