Details
Description
Currently i`m investigating in upgrade our code base from spark 3.3.0 to 3.5.0 (embedded in dedicated aws emr cluster).
I got the following exception if i execute my code on the cluster, if i run local unit tests the code runs as expected without exception.
24/03/26 19:32:19 INFO RecordServerQueryListener: Cleaning up temp directory - /user/KKQI7VHKTMNQZJQNMMZXKH5KYNRPOHXG/application_1711468652551_0023 Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 186) (ip-10-1-1-6.eu-central-1.compute.internal executor 2): java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 3 at org.apache.spark.sql.vectorized.ColumnarBatch.column(ColumnarBatch.java:95) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_parquetMax_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:142) at org.apache.spark.shuffle.ShuffleWriteProcessor.doWrite(ShuffleWriteProcessor.scala:45) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:143) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:629) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:632) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3067) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3003) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3002) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3002) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1318) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1318) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1318) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3271) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3205) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3194) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.checkNoFailures(AdaptiveExecutor.scala:154) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:88) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:277) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:276) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:558) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:520) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4411) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3370) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4401) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:625) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4399) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:129) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:165) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:165) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:276) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:164) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4399) at org.apache.spark.sql.Dataset.head(Dataset.scala:3370) at org.apache.spark.sql.Dataset.head(Dataset.scala:3377) at org.apache.spark.sql.Dataset.first(Dataset.scala:3384) at de.my.maintained.code.business.transformer.MyMaintainedClass$BusinessForecastNonRecurringTransformer.determineMaxDateId(MyMaintainedClass.scala:56) at de.my.maintained.code.business.transformer.MyMaintainedClass$BusinessForecastNonRecurringTransformer.processInternal(MyMaintainedClass.scala:25) at de.my.maintained.code.business.transformer.MyMaintainedClass$BusinessForecastNonRecurringTransformer.processInternal$(MyMaintainedClass.scala:22) at de.my.maintained.code.common.app.FqmApp$$anon$35.processInternal(FqmApp.scala:112) at de.my.maintained.code.common.transformer.Transformer.process(Transformer.scala:26) at de.my.maintained.code.common.transformer.Transformer.process$(Transformer.scala:24) at de.my.maintained.code.common.app.FqmApp$$anon$35.process(FqmApp.scala:112) at de.my.maintained.code.business.transformer.BusinessForecastTransformerComponent$BusinessForecastTransformer.processInternal(BusinessForecastTransformerComponent.scala:35) at de.my.maintained.code.business.transformer.BusinessForecastTransformerComponent$BusinessForecastTransformer.processInternal$(BusinessForecastTransformerComponent.scala:26) at de.my.maintained.code.common.app.FqmApp$$anon$33.processInternal(FqmApp.scala:110) at de.my.maintained.code.common.transformer.Transformer.process(Transformer.scala:26) at de.my.maintained.code.common.transformer.Transformer.process$(Transformer.scala:24) at de.my.maintained.code.common.app.FqmApp$$anon$33.process(FqmApp.scala:110) at de.my.maintained.code.business.aws.BusinessStage2Forecast$.main(BusinessStage2Forecast.scala:10) at de.my.maintained.code.business.aws.BusinessStage2Forecast.main(BusinessStage2Forecast.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1075) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1167) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1176) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 3 at org.apache.spark.sql.vectorized.ColumnarBatch.column(ColumnarBatch.java:95) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_parquetMax_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:142) at org.apache.spark.shuffle.ShuffleWriteProcessor.doWrite(ShuffleWriteProcessor.scala:45) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:143) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:629) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:632) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840)
A little earlier in logfile i found the following:
24/03/26 19:32:18 INFO DAGScheduler: Submitting 16 missing tasks from ShuffleMapStage 12 (MapPartitionsRDD[28] at first at MyMaintainedClass.scala:56) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
if i jump to the code i found this:
52: val first_row = 53: df_usage_current_and_next_month 54: .filter(args.billingDate.filter_current_month($"year", $"month")) 55: .withColumn("DATE_ID", $"year" * 10000 + $"month" * 100 + date_format(dateColumn, "dd").cast(IntegerType)) 56: .agg(max("DATE_ID")).first()
The Problem seems to be occur in the last row ".agg(max("DATE_ID")).first()"
So next i have identified all placed with this exception. All of them use aggregation (min/max/count) with the call of first() after that.
After that i searched all code placed with the first() or head() function in our code base and i found one example without occuring an ArrayIndexOutOfBoundException. The StackOverFlow (link at the bottom), post gave me an hint. Every time we do an spark.read (parquet) with an aggregation and calling first function AND using the same DataFrame after that for other calculations & filterings & writing we got an ArrayIndexOutOfBounds Exception. If we still only do agg and first there is no problem.
So there must be something mutable while reading the DataSource and split the executing plan into two ways. (don`t know what there happen exactly). In my opinion, an optimization mechanism intervenes there, which removes certain columns that are supposedly not needed but needed. Unfortunately i`m not able to reproduce it locally, only in AWS EMR Cluster. (maybe there is a different Configuration)
Workaround not getting the ArrayIndexOutOfBoundException
1. using an explicit spark.read for every agg an first function
OR
2. using an persist() between agg and first function
#similar problem mentioned on stackoverflow
https://stackoverflow.com/questions/53483406/spark-sql-dataframe-count-gives-java-lang-arrayindexoutofboundsexception
Versions Tested:
Spark
3.3.0 (no problem) (emr 6.11.1)
3.4.1 (ArrayIndexOutOfBoundsException) (emr 6.15.1)
3.5.0 (ArrayIndexOutOfBoundsException) (emr 7.0.0)