Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27558

NPE in TaskCompletionListener due to Spark OOM in UnsafeExternalSorter causing tasks to hang

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.3.3, 2.4.2
    • Fix Version/s: 2.4.5, 3.0.0
    • Component/s: Spark Core
    • Labels:
      None

      Description

      We see an NPE in the UnsafeInMemorySorter.getMemoryUsage function (due to the array we are accessing there being null). This looks to be caused by a Spark OOM when UnsafeInMemorySorter is trying to spill.

      This is likely a symptom of https://issues.apache.org/jira/browse/SPARK-21492. The real question for this ticket is, could we handle things more gracefully, rather than NPE. For example:

      Remove this:

      https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java#L182

      so when this fails (and store the new array into a temporary):

      https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java#L186

      we don't end up with a null "array". This state is causing one of our jobs to hang infinitely (we think) due to the original allocation error.

      Stack trace for reference

      2019-04-23 08:57:14,989 [Executor task launch worker for task 46729] ERROR org.apache.spark.TaskContextImpl  - Error in TaskCompletionListener
      java.lang.NullPointerException
      	at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.getMemoryUsage(UnsafeInMemorySorter.java:208)
      	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getMemoryUsage(UnsafeExternalSorter.java:249)
      	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.updatePeakMemoryUsed(UnsafeExternalSorter.java:253)
      	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.freeMemory(UnsafeExternalSorter.java:296)
      	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:328)
      	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.lambda$new$0(UnsafeExternalSorter.java:178)
      	at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:118)
      	at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:118)
      	at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:131)
      	at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:129)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:129)
      	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:117)
      	at org.apache.spark.scheduler.Task.run(Task.scala:119)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      
      
      2019-04-23 08:57:15,069 [Executor task launch worker for task 46729] ERROR org.apache.spark.executor.Executor  - Exception in task 102.0 in stage 28.0 (TID 46729)
      org.apache.spark.util.TaskCompletionListenerException: null
      
      Previous exception in task: Unable to acquire 65536 bytes of memory, got 0
      	org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
      	org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
      	org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset(UnsafeInMemorySorter.java:186)
      	org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:229)
      	org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:204)
      	org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:283)
      	org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:96)
      	org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:348)
      	org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:403)
      	org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135)
      	org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage23.sort_addToSorter_0$(Unknown Source)
      	org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage23.processNext(Unknown Source)
      	org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      	org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
      	org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.fetchNextRow(WindowExec.scala:314)
      	org.apache.spark.sql.execution.window.WindowExec$$anonfun$11$$anon$1.<init>(WindowExec.scala:323)
      	org.apache.spark.sql.execution.window.WindowExec$$anonfun$11.apply(WindowExec.scala:303)
      	org.apache.spark.sql.execution.window.WindowExec$$anonfun$11.apply(WindowExec.scala:302)
      	org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
      	org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
      	org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
      	org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
      	org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
      	org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
      	org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
      	org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
      	org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
      	org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
      	org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
      	org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
      	org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
      	org.apache.spark.scheduler.Task.run(Task.scala:109)
      	org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
      	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	java.lang.Thread.run(Thread.java:748)
      	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:139)
      	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:117)
      	at org.apache.spark.scheduler.Task.run(Task.scala:119)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                ayudovin Artsiom Yudovin
                Reporter:
                abellina Alessandro Bellina
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: