Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
2.2.0
-
None
Description
DiskBlockManager.getAllBlocks assumes that the directories managed by the block manager only contains files corresponding to "valid" block IDs, i.e. those parsable via BlockId.apply. This is not always the case as demonstrated by the following snippet
object GetAllBlocksFailure { def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf() .setMaster("local[*]") .setAppName("demo")) new Thread { override def run(): Unit = { while (true) { println(SparkEnv.get.blockManager.diskBlockManager.getAllBlocks().length) Thread.sleep(10) } } }.start() val rdd = sc.range(1, 65536, numSlices = 10) .map(x => (x % 4096, x)) .persist(StorageLevel.DISK_ONLY) .reduceByKey { _ + _ } .collect() } }
We have a thread computing the number of bytes occupied by the block manager on-disk and it frequently crashes due to this assumption being violated. Relevant part of the stacktrace
2017-10-06 11:20:14,287 ERROR org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[CoarseGrainedExecutorBackend-stop-executor,5,main] java.lang.IllegalStateException: Unrecognized BlockId: shuffle_1_2466_0.data.5684dd9e-9fa2-42f5-9dd2-051474e372be at org.apache.spark.storage.BlockId$.apply(BlockId.scala:133) at org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103) at org.apache.spark.storage.DiskBlockManager$$anonfun$getAllBlocks$1.apply(DiskBlockManager.scala:103) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.storage.DiskBlockManager.getAllBlocks(DiskBlockManager.scala:103)