Description
We see this in the log
Deleting segment LogSegment(baseOffset=243864, size=9269150, lastModifiedTime=1662486784182, largestRecordTimestamp=Some(1662486784160)) due to retention time 604800000ms breach based on the largest record timestamp in the segment
This then cause KafkaRaftClient to throw an exception when sending batches to the listener:
java.lang.IllegalStateException: Snapshot expected since next offset of org.apache.kafka.controller.QuorumController$QuorumMetaLogListener@195461949 is 0, log start offset is 369668 and high-watermark is 547379 at org.apache.kafka.raft.KafkaRaftClient.lambda$updateListenersProgress$4(KafkaRaftClient.java:312) at java.base/java.util.Optional.orElseThrow(Optional.java:403) at org.apache.kafka.raft.KafkaRaftClient.lambda$updateListenersProgress$5(KafkaRaftClient.java:311) at java.base/java.util.OptionalLong.ifPresent(OptionalLong.java:165) at org.apache.kafka.raft.KafkaRaftClient.updateListenersProgress(KafkaRaftClient.java:309)
The on disk state for the cluster metadata partition confirms this:
ls __cluster_metadata-0/ 00000000000000369668.index 00000000000000369668.log 00000000000000369668.timeindex 00000000000000503411.index 00000000000000503411.log 00000000000000503411.snapshot 00000000000000503411.timeindex 00000000000000548746.snapshot leader-epoch-checkpoint partition.metadata quorum-state
Noticed that there are no checkpoint files and the log doesn't have a segment at base offset 0.
This is happening because the LogConfig used for KRaft sets the retention policy to delete which causes the method deleteOldSegments to delete old segments even if there are no snaspshot for it. For KRaft, Kafka should only delete segment that breach the log start offset.
Log configuration for KRaft:
val props = new Properties() props.put(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString) props.put(LogConfig.SegmentBytesProp, Int.box(config.logSegmentBytes)) props.put(LogConfig.SegmentMsProp, Long.box(config.logSegmentMillis)) props.put(LogConfig.FileDeleteDelayMsProp, Int.box(Defaults.FileDeleteDelayMs)) LogConfig.validateValues(props) val defaultLogConfig = LogConfig(props)
Segment deletion code:
def deleteOldSegments(): Int = { if (config.delete) { deleteLogStartOffsetBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteRetentionMsBreachedSegments() } else { deleteLogStartOffsetBreachedSegments() } }
Attachments
Issue Links
- is related to
-
KAFKA-14241 Implement the snapshot cleanup policy
- Open
- links to