Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7467

NoSuchElementException is raised because controlBatch is empty

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.1.0
    • 1.0.3, 1.1.2, 2.0.1, 2.1.0
    • core
    • None

    Description

      Somehow, log cleaner died because of NoSuchElementException when it calls onControlBatchRead:

      [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, discarding deletes. (kafka.log.LogCleaner)
      [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner)
      java.util.NoSuchElementException
        at java.util.Collections$EmptyIterator.next(Collections.java:4189)
        at kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
        at kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
        at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
        at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
        at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
        at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
        at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
        at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
        at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
        at kafka.log.Cleaner.clean(LogCleaner.scala:438)
        at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
        at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
      [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner)
      

      The following code does not seem to expect the controlBatch to be empty:

      https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946

        def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
          consumeAbortedTxnsUpTo(controlBatch.lastOffset)
      
          val controlRecord = controlBatch.iterator.next()
          val controlType = ControlRecordType.parse(controlRecord.key)
          val producerId = controlBatch.producerId
      

      MemoryRecords.filterTo copies the original control attribute for empty batches, which results in empty control batches. Trying to read the control type of an empty batch causes the error.

        else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
          if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
              throw new IllegalStateException("Empty batches are only supported for magic v2 and above");
      
          bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
          DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, batch.producerId(),
                  batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), batch.lastOffset(),
                  batch.partitionLeaderEpoch(), batch.timestampType(), batch.maxTimestamp(),
                  batch.isTransactional(), batch.isControlBatch());
          filterResult.updateRetainedBatchMetadata(batch, 0, true);
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            bob-barrett Bob Barrett
            badai Badai Aqrandista
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment