Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
0.6.0, 0.6.1
-
None
Description
Summary: failures in BlockStore may lead to infinite loops of task failures.
I ran into a situation where a block manager operation failed:
12/10/20 21:25:13 ERROR storage.BlockManagerWorker: Exception handling buffer message com.esotericsoftware.kryo.SerializationException: Buffer limit exceeded writing object of type: shark.ColumnarWritable at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:492) at spark.KryoSerializationStream.writeObject(KryoSerializer.scala:78) at spark.serializer.SerializationStream$class.writeAll(Serializer.scala:58) at spark.KryoSerializationStream.writeAll(KryoSerializer.scala:73) at spark.storage.BlockManager.dataSerialize(BlockManager.scala:834) at spark.storage.MemoryStore.getBytes(MemoryStore.scala:72) at spark.storage.BlockManager.getLocalBytes(BlockManager.scala:311) at spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:79) at spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:58) at spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:33) at spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:33) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.Iterator$class.foreach(Iterator.scala:772) at scala.collection.IndexedSeqLike$Elements.foreach(IndexedSeqLike.scala:54) at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) at spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:12) at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) at spark.storage.BlockMessageArray.map(BlockMessageArray.scala:12) at spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:33) at spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:23) at spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:23) at spark.network.ConnectionManager.spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:276) at spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:242) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679)
This failure appears to have been detected via a fetch failure in the following stage:
12/10/20 21:25:12 INFO scheduler.DAGScheduler: Marking Stage 2 (mapPartitions at Operator.scala:197) for resubmision due to a fetch failure 12/10/20 21:25:12 INFO scheduler.DAGScheduler: The failed fetch was from Stage 3 (mapPartitions at Operator.scala:197); marking it for resubmission
The failed task was retried on the same machine, and it executed without exceptions.
However, the job is unable to make forward progress; the scheduler gets stuck in an infinite loop of the form
12/10/20 22:23:08 INFO spark.CacheTrackerActor: Asked for current cache locations 12/10/20 22:23:08 INFO scheduler.DAGScheduler: Resubmitting Stage 3 (mapPartitions at Operator.scala:197) because some of its tasks had failed: 220 12/10/20 22:23:08 INFO scheduler.DAGScheduler: Submitting Stage 3 (mapPartitions at Operator.scala:197), which has no missing parents 12/10/20 22:23:08 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 3 12/10/20 22:23:08 INFO cluster.ClusterScheduler: Adding task set 3.4080 with 1 tasks 12/10/20 22:23:08 INFO cluster.TaskSetManager: Starting task 3.4080:0 as TID 5484 on slave 201210202106-1093469194-5050-5222-43: domU-12-31-39-14-5E-5E.compute-1.internal (preferred) 12/10/20 22:23:08 INFO cluster.TaskSetManager: Serialized task 3.4080:0 as 7605 bytes in 0 ms 12/10/20 22:23:09 INFO cluster.TaskSetManager: Finished TID 5484 in 820 ms (progress: 1/1) 12/10/20 22:23:09 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(3, 220) 12/10/20 22:23:09 INFO scheduler.DAGScheduler: ShuffleMapTask finished with host domU-12-31-39-14-5E-5E.compute-1.internal 12/10/20 22:23:09 INFO scheduler.DAGScheduler: Stage 3 (mapPartitions at Operator.scala:197) finished; looking for newly runnable stages 12/10/20 22:23:09 INFO scheduler.DAGScheduler: running: Set() 12/10/20 22:23:09 INFO scheduler.DAGScheduler: waiting: Set(Stage 2, Stage 1) 12/10/20 22:23:09 INFO scheduler.DAGScheduler: failed: Set() 12/10/20 22:23:09 INFO spark.CacheTrackerActor: Asked for current cache locations
If I look at the worker that is running these tasks, I see infinite loop of the form
12/10/20 21:29:29 INFO exec.GroupByPreShuffleOperator: #hash table=24918 #rows=100000 reduction=0.24918 minReduction=0.5 12/10/20 21:29:29 WARN storage.BlockManager: Block shuffle_0_220_0 already exists on this machine; not re-adding it 12/10/20 21:29:29 WARN storage.BlockManager: Block shuffle_0_220_1 already exists on this machine; not re-adding it 12/10/20 21:29:29 WARN storage.BlockManager: Block shuffle_0_220_2 already exists on this machine; not re-adding it [..] 12/10/20 21:29:29 WARN storage.BlockManager: Block shuffle_0_220_199 already exists on this machine; not re-adding it 12/10/20 21:29:29 INFO executor.Executor: Serialized size of result for 1677 is 350 12/10/20 21:29:29 INFO executor.Executor: Finished task ID 1677 12/10/20 21:29:29 INFO executor.Executor: Running task ID 1678 12/10/20 21:29:29 INFO executor.Executor: Its generation is 3 12/10/20 21:29:29 INFO spark.CacheTracker: Cache key is rdd_4_220 12/10/20 21:29:29 INFO spark.CacheTracker: Found partition in cache! 12/10/20 21:29:29 INFO exec.GroupByPreShuffleOperator: Running Pre-Shuffle Group-By 12/10/20 21:29:29 INFO exec.GroupByPreShuffleOperator: Mapside hash aggregation enabled 12/10/20 21:29:29 INFO exec.GroupByPreShuffleOperator: #hash table=24918 #rows=100000 reduction=0.24918 minReduction=0.5
I'm not sure of the exact cause of this behavior, but I have a guess:
During the original failed execution, the task's output blocks were not stored but their block ids were added to the BlockManager's blockInfo map. This prevents these blocks from being recomputed and causes the "Block shuffle_* already exists on this machine; not re-adding it" warnings. As a result, the block is never stored and the master is never informed of its location.
This causes the DAG scheduler to repeatedly launch the same task in an infinite loop, saying that it is "Resubmitting Stage * because some of its tasks had failed: *".
Attachments
Issue Links
- is related to
-
SPARK-706 Failures in block manager put leads to task hanging
- Closed