Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.1.1
-
None
Description
I'm find a rather strange NPE in AppendOnlyMap. The stack trace is as follows:
java.lang.NullPointerException at org.apache.spark.util.collection.AppendOnlyMap.growTable(AppendOnlyMap.scala:248) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.growTable(SizeTrackingAppendOnlyMap.scala:38) at org.apache.spark.util.collection.AppendOnlyMap.incrementSize(AppendOnlyMap.scala:204) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:147) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
The code in question, according to GitHub, is the following, particularly the last line starting with `growThreshold`:
/** Double the table's size and re-hash everything */ protected def growTable() { // capacity < MAXIMUM_CAPACITY (2 ^ 29) so capacity * 2 won't overflow val newCapacity = capacity * 2 require(newCapacity <= MAXIMUM_CAPACITY, s"Can't contain more than ${growThreshold} elements") val newData = new Array[AnyRef](2 * newCapacity) val newMask = newCapacity - 1 // Insert all our old values into the new array. Note that because our old keys are // unique, there's no need to check for equality here when we insert. var oldPos = 0 while (oldPos < capacity) { if (!data(2 * oldPos).eq(null)) { val key = data(2 * oldPos) val value = data(2 * oldPos + 1) var newPos = rehash(key.hashCode) & newMask var i = 1 var keepGoing = true while (keepGoing) { val curKey = newData(2 * newPos) if (curKey.eq(null)) { newData(2 * newPos) = key newData(2 * newPos + 1) = value keepGoing = false } else { val delta = i newPos = (newPos + delta) & newMask i += 1 } } } oldPos += 1 } data = newData capacity = newCapacity mask = newMask growThreshold = (LOAD_FACTOR * newCapacity).toInt }
Unfortunately I haven't got a simple repro case for this, it's coming from production logs as I try to track down seemingly random executor failures. Off-hand I suspect the issue is an OOM condition that isn't being well reported.