Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7467

NoSuchElementException is raised because controlBatch is empty

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.1.0
    • 1.0.3, 1.1.2, 2.0.1, 2.1.0
    • core
    • None

    Description

      Somehow, log cleaner died because of NoSuchElementException when it calls onControlBatchRead:

      [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, discarding deletes. (kafka.log.LogCleaner)
      [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner)
      java.util.NoSuchElementException
        at java.util.Collections$EmptyIterator.next(Collections.java:4189)
        at kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
        at kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
        at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
        at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
        at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
        at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
        at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
        at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
        at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
        at kafka.log.Cleaner.clean(LogCleaner.scala:438)
        at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
        at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
      [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner)
      

      The following code does not seem to expect the controlBatch to be empty:

      https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946

        def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
          consumeAbortedTxnsUpTo(controlBatch.lastOffset)
      
          val controlRecord = controlBatch.iterator.next()
          val controlType = ControlRecordType.parse(controlRecord.key)
          val producerId = controlBatch.producerId
      

      MemoryRecords.filterTo copies the original control attribute for empty batches, which results in empty control batches. Trying to read the control type of an empty batch causes the error.

        else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
          if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
              throw new IllegalStateException("Empty batches are only supported for magic v2 and above");
      
          bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
          DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, batch.producerId(),
                  batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), batch.lastOffset(),
                  batch.partitionLeaderEpoch(), batch.timestampType(), batch.maxTimestamp(),
                  batch.isTransactional(), batch.isControlBatch());
          filterResult.updateRetainedBatchMetadata(batch, 0, true);
      

      Attachments

        Issue Links

          Activity

            People

              bob-barrett Bob Barrett
              badai Badai Aqrandista
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: