Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23991

data loss when allocateBlocksToBatch

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotVotersStop watchingWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.2.0
    • Fix Version/s: 2.3.1, 2.4.0
    • Component/s: DStreams, Input/Output
    • Labels:
      None
    • Environment:

      spark 2.11

      Description

      with checkpoint and WAL enabled, driver will write the allocation of blocks to batch into hdfs. however, if it fails as following, the blocks of this batch cannot be computed by the DAG. Because the blocks have been dequeued from the receivedBlockQueue and get lost.

      error log


      18/04/15 11:11:25 WARN ReceivedBlockTracker: Exception thrown while writing record: BatchAllocationEvent(1523765480000 ms,AllocatedBlocks(Map(0 -> ArrayBuffer()))) to the WriteAheadLog. org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.allocateBlocksToBatch(ReceivedBlockTracker.scala:118) at org.apache.spark.streaming.scheduler.ReceiverTracker.allocateBlocksToBatch(ReceiverTracker.scala:213) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) ... 12 more 18/04/15 11:11:25 INFO ReceivedBlockTracker: Possibly processed batch 1523765480000 ms needs to be processed again in WAL recovery

      the concerning codes are showed below:

        /**
         * Allocate all unallocated blocks to the given batch.
         * This event will get written to the write ahead log (if enabled).
         */
        def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
          if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
            val streamIdToBlocks = streamIds.map { streamId =>
                (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
            }.toMap
            val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
            if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
              timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
              lastAllocatedBatchTime = batchTime
            } else {
              logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
            }
          } else {
            // This situation occurs when:
            // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
            // possibly processed batch job or half-processed batch job need to be processed again,
            // so the batchTime will be equal to lastAllocatedBatchTime.
            // 2. Slow checkpointing makes recovered batch time older than WAL recovered
            // lastAllocatedBatchTime.
            // This situation will only occurs in recovery time.
            logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
          }
        }
      
      

        Attachments

          Activity

          $i18n.getText('security.level.explanation', $currentSelection) Viewable by All Users
          Cancel

            People

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment