Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
2.8.1
-
None
-
None
Description
In our production environment, disk break down cause data corruption. When consumer and follower read from partition leader, CorruptRecordException is thrown:
Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 0 is less than the minimum record overhead
Call stack is muck like:
Breakpoint reached at org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:62) at org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:40) at org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35) at org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24) at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) at org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45) at org.apache.kafka.common.record.FileRecords.searchForOffsetWithSize(FileRecords.java:286) at kafka.log.LogSegment.translateOffset(LogSegment.scala:254) at kafka.log.LogSegment.read(LogSegment.scala:277) at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1161) at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1116) at kafka.log.Log.maybeHandleIOException(Log.scala:1839) <--------------- only cope with IOException at kafka.log.Log.read(Log.scala:1116) at kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:926) at kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:989) at kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:988) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:988) at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:815) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:828) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:680) at kafka.server.KafkaApis.handle(KafkaApis.scala:107) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) at java.lang.Thread.run(Thread.java:748)
CorruptRecordException extends RetriableException. When broker reads from local log segment, data corruption usually cannot fixed by retry.
I think local file currption should cause log offline, but currently only IOException has chance to cause log offline in Log#maybeHandleIOException.
So even if I have 3 replica, consumer will never continue consume once data curruption happen in leader.