Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13194

LogCleaner may clean past highwatermark

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • None
    • 3.1.0
    • None
    • None

    Description

      Here we have the cleaning point being bounded to the active segment base offset and the first unstable offset. Which makes sense:

       

         // find first segment that cannot be cleaned
          // neither the active segment, nor segments with any messages closer to the head of the log than the minimum compaction lag time
          // may be cleaned
          val firstUncleanableDirtyOffset: Long = Seq(      // we do not clean beyond the first unstable offset
            log.firstUnstableOffset,      // the active segment is always uncleanable
            Option(log.activeSegment.baseOffset),      // the first segment whose largest message timestamp is within a minimum time lag from now
            if (minCompactionLagMs > 0) {
              // dirty log segments
              val dirtyNonActiveSegments = log.localNonActiveLogSegmentsFrom(firstDirtyOffset)
              dirtyNonActiveSegments.find { s =>
                val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
                debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} " +
                  s"segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - minCompactionLagMs}; " +
                  s"is uncleanable=$isUncleanable")
                isUncleanable
              }.map(_.baseOffset)
            } else None
          ).flatten.min
      
      
      

       

      But LSO starts out as None.

      @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] = None
      
      private[log] def firstUnstableOffset: Option[Long] = firstUnstableOffsetMetadata.map(_.messageOffset)

      For most code depending on the LSO, fetchLastStableOffsetMetadata is used to default it to the hwm if it's not set.

       

      
        private def fetchLastStableOffsetMetadata: LogOffsetMetadata = {
          checkIfMemoryMappedBufferClosed()    // cache the current high watermark to avoid a concurrent update invalidating the range check
          val highWatermarkMetadata = fetchHighWatermarkMetadata    firstUnstableOffsetMetadata match {
            case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermarkMetadata.messageOffset =>
              if (offsetMetadata.messageOffsetOnly) {
                lock synchronized {
                  val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
                  if (firstUnstableOffsetMetadata.contains(offsetMetadata))
                    firstUnstableOffsetMetadata = Some(fullOffset)
                  fullOffset
                }
              } else {
                offsetMetadata
              }
            case _ => highWatermarkMetadata
          }
        }
      
      

       

       

      This means that in the case where the hwm is prior to the active segment base, the log cleaner may clean past the hwm. This is most likely to occur after a broker restart when the log cleaner may start cleaning prior to replication becoming active.

      Attachments

        Issue Links

          Activity

            People

              lucasbradstreet Lucas Bradstreet
              lucasbradstreet Lucas Bradstreet
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: