Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-14238

KRaft replicas can delete segments not included in a snapshot

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • None
    • 3.3.0
    • core, kraft
    • None

    Description

      We see this in the log

      Deleting segment LogSegment(baseOffset=243864, size=9269150, lastModifiedTime=1662486784182, largestRecordTimestamp=Some(1662486784160)) due to retention time 604800000ms breach based on the largest record timestamp in the segment 

      This then cause KafkaRaftClient to throw an exception when sending batches to the listener:

       java.lang.IllegalStateException: Snapshot expected since next offset of org.apache.kafka.controller.QuorumController$QuorumMetaLogListener@195461949 is 0, log start offset is 369668 and high-watermark is 547379
      	at org.apache.kafka.raft.KafkaRaftClient.lambda$updateListenersProgress$4(KafkaRaftClient.java:312)
      	at java.base/java.util.Optional.orElseThrow(Optional.java:403)
      	at org.apache.kafka.raft.KafkaRaftClient.lambda$updateListenersProgress$5(KafkaRaftClient.java:311)
      	at java.base/java.util.OptionalLong.ifPresent(OptionalLong.java:165)
      	at org.apache.kafka.raft.KafkaRaftClient.updateListenersProgress(KafkaRaftClient.java:309)

      The on disk state for the cluster metadata partition confirms this:

       ls __cluster_metadata-0/
      00000000000000369668.index
      00000000000000369668.log
      00000000000000369668.timeindex
      00000000000000503411.index
      00000000000000503411.log
      00000000000000503411.snapshot
      00000000000000503411.timeindex
      00000000000000548746.snapshot
      leader-epoch-checkpoint
      partition.metadata
      quorum-state

      Noticed that there are no checkpoint files and the log doesn't have a segment at base offset 0.

      This is happening because the LogConfig used for KRaft sets the retention policy to delete which causes the method deleteOldSegments to delete old segments even if there are no snaspshot for it. For KRaft, Kafka should only delete segment that breach the log start offset.

      Log configuration for KRaft:

            val props = new Properties()
            props.put(LogConfig.MaxMessageBytesProp, config.maxBatchSizeInBytes.toString)
            props.put(LogConfig.SegmentBytesProp, Int.box(config.logSegmentBytes))
            props.put(LogConfig.SegmentMsProp, Long.box(config.logSegmentMillis))
            props.put(LogConfig.FileDeleteDelayMsProp, Int.box(Defaults.FileDeleteDelayMs))
            LogConfig.validateValues(props)
            val defaultLogConfig = LogConfig(props)

      Segment deletion code:

           def deleteOldSegments(): Int = {
            if (config.delete) {
              deleteLogStartOffsetBreachedSegments() +
                deleteRetentionSizeBreachedSegments() +
                deleteRetentionMsBreachedSegments()
            } else {
              deleteLogStartOffsetBreachedSegments()
            }
          }

      Attachments

        Issue Links

          Activity

            People

              jagsancio Jose Armando Garcia Sancio
              jagsancio Jose Armando Garcia Sancio
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: