Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26734

StackOverflowError on WAL serialization caused by large receivedBlockQueue

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments


    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.3.1, 2.3.2, 2.4.0
    • Fix Version/s: 2.3.4, 2.4.1, 3.0.0
    • Labels:
    • Environment:

      spark 2.4.0 streaming job

      java 1.8

      scala 2.11.12


      We encountered an intermittent StackOverflowError with a stack trace similar to:


      Exception in thread "JobGenerator" java.lang.StackOverflowError
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

      The name of the thread has been seen to be either "JobGenerator" or "streaming-start", depending on when in the lifecycle of the job the problem occurs.  It appears to only occur in streaming jobs with checkpointing and WAL enabled; this has prevented us from upgrading to v2.4.0.


      Via debugging, we tracked this down to allocateBlocksToBatch in ReceivedBlockTracker:

       * Allocate all unallocated blocks to the given batch.
       * This event will get written to the write ahead log (if enabled).
      def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
        if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
          val streamIdToBlocks = streamIds.map { streamId =>
            (streamId, getReceivedBlockQueue(streamId).clone())
          val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
          if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
            timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
            lastAllocatedBatchTime = batchTime
          } else {
            logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
        } else {
          // This situation occurs when:
          // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
          // possibly processed batch job or half-processed batch job need to be processed again,
          // so the batchTime will be equal to lastAllocatedBatchTime.
          // 2. Slow checkpointing makes recovered batch time older than WAL recovered
          // lastAllocatedBatchTime.
          // This situation will only occurs in recovery time.
          logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")

      Prior to 2.3.1, this code did

      getReceivedBlockQueue(streamId).dequeueAll(x => true)

      but it was changed as part of SPARK-23991 to


      We've not been able to reproduce this in a test of the actual above method, but we've been able to produce a test that reproduces it by putting a lot of values into the queue:


      class SerializationFailureTest extends FunSpec {
        private val logger = LoggerFactory.getLogger(getClass)
        private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
        describe("Queue") {
          it("should be serializable") {
          it("should not be serializable") {
          it("should DEFINITELY not be serializable") {
        private def runTest(mx: Int): Array[Byte] = {
          try {
            val random = new scala.util.Random()
            val queue = new ReceivedBlockQueue()
            for (_ <- 0 until mx) {
              queue += ReceivedBlockInfo(
                streamId = 0,
                numRecords = Some(random.nextInt(5)),
                metadataOption = None,
                blockStoreResult = WriteAheadLogBasedStoreResult(
                  blockId = StreamBlockId(0, random.nextInt()),
                  numRecords = Some(random.nextInt(5)),
                  walRecordHandle = FileBasedWriteAheadLogSegment(
                    path = s"""hdfs://foo.bar.com:8080/spark/streaming/BAZ/00007/receivedData/0/log-${random.nextInt()}-${random.nextInt()}""",
                    offset = random.nextLong(),
                    length = random.nextInt()
            val record = BatchAllocationEvent(
              Time(1548320400000L), AllocatedBlocks(
                  0 -> queue
          } catch {
            case t: Throwable =>

      In my tests it seemed like the serialization would fail if there were ~1064 elements in the queue.  I'm assuming that this is actually a scala bug, though I haven't tried reproducing it without the involvement of the spark objects.

      I expect this could be solved by transforming the cloned queue into a different type of Seq.





            • Assignee:
              eddardstark Ross M. Lodge
              eddardstark Ross M. Lodge


              • Created:

                Issue deployment