Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-47615

Aggregate + First() Function - ArrayIndexOutOfBoundsException - ColumnPruning?

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.4.1, 3.5.0
    • None
    • Optimizer
    • None
    • Amazon EMR version
      emr-7.0.0
      Installed applications
      Tez 0.10.2, Spark 3.5.0
      Amazon Linux release
      2023.3.20240312.0
       
      1 Master Node m6g.xlarge
      2 Core Nodes m6g.2xlarge
       
       

    Description

      Currently i`m investigating in upgrade our code base from spark 3.3.0 to 3.5.0 (embedded in dedicated aws emr cluster).
       
      I got the following exception if i execute my code on the cluster, if i run local unit tests the code runs as expected without exception.
       
       

      24/03/26 19:32:19 INFO RecordServerQueryListener: Cleaning up temp directory - /user/KKQI7VHKTMNQZJQNMMZXKH5KYNRPOHXG/application_1711468652551_0023 Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 186) (ip-10-1-1-6.eu-central-1.compute.internal executor 2): java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 3 at org.apache.spark.sql.vectorized.ColumnarBatch.column(ColumnarBatch.java:95) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_parquetMax_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:142) at org.apache.spark.shuffle.ShuffleWriteProcessor.doWrite(ShuffleWriteProcessor.scala:45) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:143) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:629) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:632) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840)   Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3067) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3003) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3002) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3002) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1318) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1318) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1318) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3271) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3205) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3194) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.checkNoFailures(AdaptiveExecutor.scala:154) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:88) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66) at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:277) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:276) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:558) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:520) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4411) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3370) at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4401) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:625) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4399) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:129) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:165) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:165) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:276) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:164) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4399) at org.apache.spark.sql.Dataset.head(Dataset.scala:3370) at org.apache.spark.sql.Dataset.head(Dataset.scala:3377) at org.apache.spark.sql.Dataset.first(Dataset.scala:3384) at de.my.maintained.code.business.transformer.MyMaintainedClass$BusinessForecastNonRecurringTransformer.determineMaxDateId(MyMaintainedClass.scala:56) at de.my.maintained.code.business.transformer.MyMaintainedClass$BusinessForecastNonRecurringTransformer.processInternal(MyMaintainedClass.scala:25) at de.my.maintained.code.business.transformer.MyMaintainedClass$BusinessForecastNonRecurringTransformer.processInternal$(MyMaintainedClass.scala:22) at de.my.maintained.code.common.app.FqmApp$$anon$35.processInternal(FqmApp.scala:112) at de.my.maintained.code.common.transformer.Transformer.process(Transformer.scala:26) at de.my.maintained.code.common.transformer.Transformer.process$(Transformer.scala:24) at de.my.maintained.code.common.app.FqmApp$$anon$35.process(FqmApp.scala:112) at de.my.maintained.code.business.transformer.BusinessForecastTransformerComponent$BusinessForecastTransformer.processInternal(BusinessForecastTransformerComponent.scala:35) at de.my.maintained.code.business.transformer.BusinessForecastTransformerComponent$BusinessForecastTransformer.processInternal$(BusinessForecastTransformerComponent.scala:26) at de.my.maintained.code.common.app.FqmApp$$anon$33.processInternal(FqmApp.scala:110) at de.my.maintained.code.common.transformer.Transformer.process(Transformer.scala:26) at de.my.maintained.code.common.transformer.Transformer.process$(Transformer.scala:24) at de.my.maintained.code.common.app.FqmApp$$anon$33.process(FqmApp.scala:110) at de.my.maintained.code.business.aws.BusinessStage2Forecast$.main(BusinessStage2Forecast.scala:10) at de.my.maintained.code.business.aws.BusinessStage2Forecast.main(BusinessStage2Forecast.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1075) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1167) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1176) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 3 at org.apache.spark.sql.vectorized.ColumnarBatch.column(ColumnarBatch.java:95) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_parquetMax_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnaraggregatetorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:142) at org.apache.spark.shuffle.ShuffleWriteProcessor.doWrite(ShuffleWriteProcessor.scala:45) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) at org.apache.spark.scheduler.Task.run(Task.scala:143) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:629) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:632) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840)

       
       
      A little earlier in logfile i found the following:
       
      24/03/26 19:32:18 INFO DAGScheduler: Submitting 16 missing tasks from ShuffleMapStage 12 (MapPartitionsRDD[28] at first at MyMaintainedClass.scala:56) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
       
       
      if i jump to the code i found this:
       
       

      52: val first_row =
      53:  df_usage_current_and_next_month
      54:   .filter(args.billingDate.filter_current_month($"year", $"month"))
      55:   .withColumn("DATE_ID", $"year" * 10000 + $"month" * 100 + date_format(dateColumn, "dd").cast(IntegerType))
      56:   .agg(max("DATE_ID")).first()  

       
      The Problem seems to be occur in the last row ".agg(max("DATE_ID")).first()"
       
      So next i have identified all placed with this exception. All of them use aggregation (min/max/count) with the call of first() after that.
       
      After that i searched all code placed with the first() or head() function in our code base and i found one example without occuring an ArrayIndexOutOfBoundException. The StackOverFlow (link at the bottom), post gave me an hint. Every time we do an spark.read (parquet) with an aggregation and calling first function AND using the same DataFrame after that for other calculations & filterings & writing we got an ArrayIndexOutOfBounds Exception. If we still only do agg and first there is no problem.
       
      So there must be something mutable while reading the DataSource and split the executing plan into two ways. (don`t know what there happen exactly). In my opinion, an optimization mechanism intervenes there, which removes certain columns that are supposedly not needed but needed. Unfortunately i`m not able to reproduce it locally, only in AWS EMR Cluster. (maybe there is a different Configuration)
       
      Workaround not getting the ArrayIndexOutOfBoundException
       
      1. using an explicit spark.read for every agg an first function
      OR
      2. using an persist() between agg and first function
       
       
      #similar problem mentioned on stackoverflow
      https://stackoverflow.com/questions/53483406/spark-sql-dataframe-count-gives-java-lang-arrayindexoutofboundsexception
       
       
      Versions Tested:
       
      Spark
      3.3.0 (no problem) (emr 6.11.1)
      3.4.1 (ArrayIndexOutOfBoundsException) (emr 6.15.1)
      3.5.0 (ArrayIndexOutOfBoundsException) (emr 7.0.0)
       

      Attachments

        Activity

          People

            Unassigned Unassigned
            schreiber Frederik Schreiber
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: