Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-8367

ReliableKafka will loss data when `spark.streaming.blockInterval` was 0

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.4.0
    • 1.4.1, 1.5.0
    • DStreams
    • None

    Description

      BlockGenerator.scala
        /** Change the buffer to which single records are added to. */
        private def updateCurrentBuffer(time: Long): Unit = synchronized {
          try {
            val newBlockBuffer = currentBuffer
            currentBuffer = new ArrayBuffer[Any]
            if (newBlockBuffer.size > 0) {
      
             val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
      
              val newBlock = new Block(blockId, newBlockBuffer)
              listener.onGenerateBlock(blockId)
              blocksForPushing.put(newBlock)  // put is blocking when queue is full
              logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
            }
          } catch {
            case ie: InterruptedException =>
              logInfo("Block updating timer thread was interrupted")
            case e: Exception =>
              reportError("Error in block updating thread", e)
          }
        }
      

      If spark.streaming.blockInterval was 0, the blockId in the code will always be the same because of time was 0 and blockIntervalMs was 0 too.

      ReliableKafkaReceiver.scala
         private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
          // Get a snapshot of current offset map and store with related block id.
          val offsetSnapshot = topicPartitionOffsetMap.toMap
          blockOffsetMap.put(blockId, offsetSnapshot)
          topicPartitionOffsetMap.clear()
        }
      

      If the blockId was the same, Streaming will commit the offset before the really data comsumed(data was waitting to be commit but the offset had updated and commit by previous commit)
      So when exception occures, the offset had commit but the data will loss since the data was in memory and not comsumed yet.

      Attachments

        Activity

          People

            carlmartin Zhaowei Huang
            carlmartin Zhaowei Huang
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: