Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23480

NullPointerException in AppendOnlyMap.growTable

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.1.1
    • None
    • Block Manager, Spark Core

    Description

      I'm find a rather strange NPE in AppendOnlyMap. The stack trace is as follows:

       

      java.lang.NullPointerException
      	at org.apache.spark.util.collection.AppendOnlyMap.growTable(AppendOnlyMap.scala:248)
      	at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable(SizeTrackingAppendOnlyMap.scala:38)
      	at org.apache.spark.util.collection.AppendOnlyMap.incrementSize(AppendOnlyMap.scala:204)
      	at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:147)
      	at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
      	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
      	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
      	at org.apache.spark.scheduler.Task.run(Task.scala:108)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)

      The code in question, according to GitHub, is the following, particularly the last line starting with `growThreshold`:

       

      /** Double the table's size and re-hash everything */
      protected def growTable() {
      // capacity < MAXIMUM_CAPACITY (2 ^ 29) so capacity * 2 won't overflow
      val newCapacity = capacity * 2
      require(newCapacity <= MAXIMUM_CAPACITY, s"Can't contain more than ${growThreshold} elements")
      val newData = new Array[AnyRef](2 * newCapacity)
      val newMask = newCapacity - 1
      // Insert all our old values into the new array. Note that because our old keys are
      // unique, there's no need to check for equality here when we insert.
      var oldPos = 0
      while (oldPos < capacity) {
      if (!data(2 * oldPos).eq(null)) {
      val key = data(2 * oldPos)
      val value = data(2 * oldPos + 1)
      var newPos = rehash(key.hashCode) & newMask
      var i = 1
      var keepGoing = true
      while (keepGoing) {
      val curKey = newData(2 * newPos)
      if (curKey.eq(null)) {
      newData(2 * newPos) = key
      newData(2 * newPos + 1) = value
      keepGoing = false
      } else {
      val delta = i
      newPos = (newPos + delta) & newMask
      i += 1
      }
      }
      }
      oldPos += 1
      }
      data = newData
      capacity = newCapacity
      mask = newMask
      growThreshold = (LOAD_FACTOR * newCapacity).toInt
      }

       

      Unfortunately I haven't got a simple repro case for this, it's coming from production logs as I try to track down seemingly random executor failures. Off-hand I suspect the issue is an OOM condition that isn't being well reported.

      Attachments

        Activity

          People

            Unassigned Unassigned
            easel Erik LaBianca
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: