Description
Somehow, log cleaner died because of NoSuchElementException when it calls onControlBatchRead:
[2018-09-25 14:18:31,088] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-45 (largest timestamp Fri Apr 27 16:12:39 CDT 2018) into 0, discarding deletes. (kafka.log.LogCleaner) [2018-09-25 14:18:31,092] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner) java.util.NoSuchElementException at java.util.Collections$EmptyIterator.next(Collections.java:4189) at kafka.log.CleanedTransactionMetadata.onControlBatchRead(LogCleaner.scala:945) at kafka.log.Cleaner.kafka$log$Cleaner$$shouldDiscardBatch(LogCleaner.scala:636) at kafka.log.Cleaner$$anon$5.checkBatchRetention(LogCleaner.scala:573) at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:157) at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:138) at kafka.log.Cleaner.cleanInto(LogCleaner.scala:604) at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:518) at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:462) at kafka.log.Cleaner$$anonfun$doClean$4.apply(LogCleaner.scala:461) at scala.collection.immutable.List.foreach(List.scala:392) at kafka.log.Cleaner.doClean(LogCleaner.scala:461) at kafka.log.Cleaner.clean(LogCleaner.scala:438) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) [2018-09-25 14:18:31,093] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner)
The following code does not seem to expect the controlBatch to be empty:
https://github.com/apache/kafka/blob/1.1/core/src/main/scala/kafka/log/LogCleaner.scala#L946
def onControlBatchRead(controlBatch: RecordBatch): Boolean = { consumeAbortedTxnsUpTo(controlBatch.lastOffset) val controlRecord = controlBatch.iterator.next() val controlType = ControlRecordType.parse(controlRecord.key) val producerId = controlBatch.producerId
MemoryRecords.filterTo copies the original control attribute for empty batches, which results in empty control batches. Trying to read the control type of an empty batch causes the error.
else if (batchRetention == BatchRetention.RETAIN_EMPTY) { if (batchMagic < RecordBatch.MAGIC_VALUE_V2) throw new IllegalStateException("Empty batches are only supported for magic v2 and above"); bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, batch.producerId(), batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), batch.lastOffset(), batch.partitionLeaderEpoch(), batch.timestampType(), batch.maxTimestamp(), batch.isTransactional(), batch.isControlBatch()); filterResult.updateRetainedBatchMetadata(batch, 0, true);
Attachments
Issue Links
- links to