diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 6a98134..514941c 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -75,10 +75,11 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To def grabFilthiestLog(): Option[LogToClean] = { inLock(lock) { val lastClean = allCleanerCheckpoints() - val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe - .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress - .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0))) // create a LogToClean instance for each - .filter(l => l.totalBytes > 0) // skip any empty logs + val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe + .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress + .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each + lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset))) + .filter(l => l.totalBytes > 0) // skip any empty logs if(!dirtyLogs.isEmpty) this.dirtiestLogCleanableRatio = dirtyLogs.max.cleanableRatio val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b0506d4..d0bbeb6 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -116,7 +116,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */ val logCleanupIntervalMs = props.getLongInRange("log.retention.check.interval.ms", 5*60*1000, (1, Long.MaxValue)) - /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "dedupe" */ + /* the default cleanup policy for segments beyond the retention window, must be either "delete" or "compact" */ val logCleanupPolicy = props.getString("log.cleanup.policy", "delete") /* the number of background threads to use for log cleaning */ diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala index 19f61a9..7af2f43 100644 --- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala +++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala @@ -90,7 +90,7 @@ class OffsetCheckpoint(val file: File) extends Logging { val topic = pieces(0) val partition = pieces(1).toInt val offset = pieces(2).toLong - offsets += (TopicAndPartition(pieces(0), partition) -> offset) + offsets += (TopicAndPartition(topic, partition) -> offset) line = reader.readLine() } if(offsets.size != expectedSize) diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 89a88a7..5417628 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -159,7 +159,7 @@ class OffsetManager(val config: OffsetManagerConfig, def offsetsTopicConfig: Properties = { val props = new Properties props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString) - props.put(LogConfig.CleanupPolicyProp, "dedupe") + props.put(LogConfig.CleanupPolicyProp, "compact") props } diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 9aeb69d..5bfa764 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -92,7 +92,7 @@ class LogCleanerIntegrationTest extends JUnitSuite { def makeCleaner(parts: Int, minDirtyMessages: Int = 0, numThreads: Int = 1, - defaultPolicy: String = "dedupe", + defaultPolicy: String = "compact", policyOverrides: Map[String, String] = Map()): LogCleaner = { // create partitions and add them to the pool