Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.3.1, 2.3.2, 2.4.0
-
None
-
spark 2.4.0 streaming job
java 1.8
scala 2.11.12
Description
We encountered an intermittent StackOverflowError with a stack trace similar to:
Exception in thread "JobGenerator" java.lang.StackOverflowError at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
The name of the thread has been seen to be either "JobGenerator" or "streaming-start", depending on when in the lifecycle of the job the problem occurs. It appears to only occur in streaming jobs with checkpointing and WAL enabled; this has prevented us from upgrading to v2.4.0.
Via debugging, we tracked this down to allocateBlocksToBatch in ReceivedBlockTracker:
/** * Allocate all unallocated blocks to the given batch. * This event will get written to the write ahead log (if enabled). */ def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => (streamId, getReceivedBlockQueue(streamId).clone()) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { streamIds.foreach(getReceivedBlockQueue(_).clear()) timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } } else { // This situation occurs when: // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, // possibly processed batch job or half-processed batch job need to be processed again, // so the batchTime will be equal to lastAllocatedBatchTime. // 2. Slow checkpointing makes recovered batch time older than WAL recovered // lastAllocatedBatchTime. // This situation will only occurs in recovery time. logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery") } }
Prior to 2.3.1, this code did
getReceivedBlockQueue(streamId).dequeueAll(x => true)
but it was changed as part of SPARK-23991 to
getReceivedBlockQueue(streamId).clone()
We've not been able to reproduce this in a test of the actual above method, but we've been able to produce a test that reproduces it by putting a lot of values into the queue:
class SerializationFailureTest extends FunSpec { private val logger = LoggerFactory.getLogger(getClass) private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo] describe("Queue") { it("should be serializable") { runTest(1062) } it("should not be serializable") { runTest(1063) } it("should DEFINITELY not be serializable") { runTest(199952) } } private def runTest(mx: Int): Array[Byte] = { try { val random = new scala.util.Random() val queue = new ReceivedBlockQueue() for (_ <- 0 until mx) { queue += ReceivedBlockInfo( streamId = 0, numRecords = Some(random.nextInt(5)), metadataOption = None, blockStoreResult = WriteAheadLogBasedStoreResult( blockId = StreamBlockId(0, random.nextInt()), numRecords = Some(random.nextInt(5)), walRecordHandle = FileBasedWriteAheadLogSegment( path = s"""hdfs://foo.bar.com:8080/spark/streaming/BAZ/00007/receivedData/0/log-${random.nextInt()}-${random.nextInt()}""", offset = random.nextLong(), length = random.nextInt() ) ) ) } val record = BatchAllocationEvent( Time(1548320400000L), AllocatedBlocks( Map( 0 -> queue ) ) ) Utils.serialize(record) } catch { case t: Throwable => fail(t) } } }
In my tests it seemed like the serialization would fail if there were ~1064 elements in the queue. I'm assuming that this is actually a scala bug, though I haven't tried reproducing it without the involvement of the spark objects.
I expect this could be solved by transforming the cloned queue into a different type of Seq.
Attachments
Issue Links
- links to