Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
2.3.0
-
None
-
None
Description
In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced max.compaction.lag.ms to guarantee that a record be cleaned before a certain time.
The implementation in LogCleanerManager has the following code. The path for earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, it seems that we should set the delay to 0 so that we could trigger cleaning immediately since the segment has been dirty for longer than max.compaction.lag.ms.
def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : Long = { ... val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L) val cleanUntilTime = now - maxCompactionLagMs if (earliestDirtySegmentTimestamp < cleanUntilTime) cleanUntilTime - earliestDirtySegmentTimestamp else 0L }