Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26734

StackOverflowError on WAL serialization caused by large receivedBlockQueue

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.3.1, 2.3.2, 2.4.0
    • 2.3.4, 2.4.1, 3.0.0
    • None
    • spark 2.4.0 streaming job

      java 1.8

      scala 2.11.12

    Description

      We encountered an intermittent StackOverflowError with a stack trace similar to:

       

      Exception in thread "JobGenerator" java.lang.StackOverflowError
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
      at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
      at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

      The name of the thread has been seen to be either "JobGenerator" or "streaming-start", depending on when in the lifecycle of the job the problem occurs.  It appears to only occur in streaming jobs with checkpointing and WAL enabled; this has prevented us from upgrading to v2.4.0.

       

      Via debugging, we tracked this down to allocateBlocksToBatch in ReceivedBlockTracker:

      /**
       * Allocate all unallocated blocks to the given batch.
       * This event will get written to the write ahead log (if enabled).
       */
      def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
        if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
          val streamIdToBlocks = streamIds.map { streamId =>
            (streamId, getReceivedBlockQueue(streamId).clone())
          }.toMap
          val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
          if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
            streamIds.foreach(getReceivedBlockQueue(_).clear())
            timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
            lastAllocatedBatchTime = batchTime
          } else {
            logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
          }
        } else {
          // This situation occurs when:
          // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
          // possibly processed batch job or half-processed batch job need to be processed again,
          // so the batchTime will be equal to lastAllocatedBatchTime.
          // 2. Slow checkpointing makes recovered batch time older than WAL recovered
          // lastAllocatedBatchTime.
          // This situation will only occurs in recovery time.
          logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
        }
      }
      

      Prior to 2.3.1, this code did

      getReceivedBlockQueue(streamId).dequeueAll(x => true)

      but it was changed as part of SPARK-23991 to

      getReceivedBlockQueue(streamId).clone()

      We've not been able to reproduce this in a test of the actual above method, but we've been able to produce a test that reproduces it by putting a lot of values into the queue:

       

      class SerializationFailureTest extends FunSpec {
      
        private val logger = LoggerFactory.getLogger(getClass)
      
        private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
      
        describe("Queue") {
          it("should be serializable") {
            runTest(1062)
          }
          it("should not be serializable") {
            runTest(1063)
          }
          it("should DEFINITELY not be serializable") {
            runTest(199952)
          }
        }
      
        private def runTest(mx: Int): Array[Byte] = {
          try {
            val random = new scala.util.Random()
            val queue = new ReceivedBlockQueue()
            for (_ <- 0 until mx) {
              queue += ReceivedBlockInfo(
                streamId = 0,
                numRecords = Some(random.nextInt(5)),
                metadataOption = None,
                blockStoreResult = WriteAheadLogBasedStoreResult(
                  blockId = StreamBlockId(0, random.nextInt()),
                  numRecords = Some(random.nextInt(5)),
                  walRecordHandle = FileBasedWriteAheadLogSegment(
                    path = s"""hdfs://foo.bar.com:8080/spark/streaming/BAZ/00007/receivedData/0/log-${random.nextInt()}-${random.nextInt()}""",
                    offset = random.nextLong(),
                    length = random.nextInt()
                  )
                )
              )
            }
            val record = BatchAllocationEvent(
              Time(1548320400000L), AllocatedBlocks(
                Map(
                  0 -> queue
                )
              )
            )
            Utils.serialize(record)
          } catch {
            case t: Throwable =>
              fail(t)
          }
        }
      }
      
      

      In my tests it seemed like the serialization would fail if there were ~1064 elements in the queue.  I'm assuming that this is actually a scala bug, though I haven't tried reproducing it without the involvement of the spark objects.

      I expect this could be solved by transforming the cloned queue into a different type of Seq.

       

      Attachments

        Issue Links

          Activity

            People

              eddardstark Ross M. Lodge
              eddardstark Ross M. Lodge
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: