Description
BlockGenerator.scala
/** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size > 0) { val blockId = StreamBlockId(receiverId, time - blockIntervalMs) val newBlock = new Block(blockId, newBlockBuffer) listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug("Last element in " + blockId + " is " + newBlockBuffer.last) } } catch { case ie: InterruptedException => logInfo("Block updating timer thread was interrupted") case e: Exception => reportError("Error in block updating thread", e) } }
If spark.streaming.blockInterval was 0, the blockId in the code will always be the same because of time was 0 and blockIntervalMs was 0 too.
ReliableKafkaReceiver.scala
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) topicPartitionOffsetMap.clear() }
If the blockId was the same, Streaming will commit the offset before the really data comsumed(data was waitting to be commit but the offset had updated and commit by previous commit)
So when exception occures, the offset had commit but the data will loss since the data was in memory and not comsumed yet.