Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7467

NoSuchElementException is raised because controlBatch is empty

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.0
    • Fix Version/s: 1.0.3, 1.1.2, 2.0.1, 2.1.0
    • Component/s: core
    • Labels:
      None

      Description

      Somehow, log cleaner died because of NoSuchElementException when it calls onControlBatchRead:

      [2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, discarding deletes. (kafka.log.LogCleaner)
      [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner)
      java.util.NoSuchElementException
        at java.util.Collections$EmptyIterator.next(Collections.java:4189)
        at kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945)
        at kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636)
        at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573)
        at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157)
        at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138)
        at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604)
        at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518)
        at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462)
        at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
        at kafka.log.Cleaner.clean(LogCleaner.scala:438)
        at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
        at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
      [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner)
      

      The following code does not seem to expect the controlBatch to be empty:

      https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946

        def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
          consumeAbortedTxnsUpTo(controlBatch.lastOffset)
      
          val controlRecord = controlBatch.iterator.next()
          val controlType = ControlRecordType.parse(controlRecord.key)
          val producerId = controlBatch.producerId
      

      MemoryRecords.filterTo copies the original control attribute for empty batches, which results in empty control batches. Trying to read the control type of an empty batch causes the error.

        else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
          if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
              throw new IllegalStateException("Empty batches are only supported for magic v2 and above");
      
          bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
          DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, batch.producerId(),
                  batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), batch.lastOffset(),
                  batch.partitionLeaderEpoch(), batch.timestampType(), batch.maxTimestamp(),
                  batch.isTransactional(), batch.isControlBatch());
          filterResult.updateRetainedBatchMetadata(batch, 0, true);
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                bob-barrett Bob Barrett
                Reporter:
                badai Badai Aqrandista
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: