Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Bug
-
3.0.1
-
None
-
None
Description
This should be a regression bug.
https://issues.apache.org/jira/browse/KAFKA-7400
Currently the logic in LogManager.cleanupLogs() does not follow the above JIRA.
Analyze the source code of LogManager.cleanupLogs():
1. When getting deletableLogs, the logs with `compact` enabled are filtered out in the two branches of if-else below.
// clean current logs. val deletableLogs = { if (cleaner != null) { // prevent cleaner from working on same partitions when changing cleanup policy cleaner.pauseCleaningForNonCompactedPartitions() } else { currentLogs.filter { case (_, log) => !log.config.compact } } }
2. But in the subsequent UnifiedLog.deleteOldSegments method, from the comments and code logic of this method, the topic of enable `compact` also wants to execute the deleteLogStartOffsetBreachedSegments method.
/** * If topic deletion is enabled, delete any local log segments that have either expired due to time based retention * or because the log size is > retentionSize. * * Whether or not deletion is enabled, delete any local log segments that are before the log start offset */ def deleteOldSegments(): Int = { if (config.delete) { deleteLogStartOffsetBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteRetentionMsBreachedSegments() } else { deleteLogStartOffsetBreachedSegments() } }