diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 6a98134..514941c 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -75,10 +75,11 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To def grabFilthiestLog(): Option[LogToClean] = { inLock(lock) { val lastClean = allCleanerCheckpoints() - val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe - .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress - .map(l => LogToClean(l._1, l._2, lastClean.getOrElse(l._1, 0))) // create a LogToClean instance for each - .filter(l => l.totalBytes > 0) // skip any empty logs + val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe + .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress + .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each + lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset))) + .filter(l => l.totalBytes > 0) // skip any empty logs if(!dirtyLogs.isEmpty) this.dirtiestLogCleanableRatio = dirtyLogs.max.cleanableRatio val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala index 19f61a9..7af2f43 100644 --- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala +++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala @@ -90,7 +90,7 @@ class OffsetCheckpoint(val file: File) extends Logging { val topic = pieces(0) val partition = pieces(1).toInt val offset = pieces(2).toLong - offsets += (TopicAndPartition(pieces(0), partition) -> offset) + offsets += (TopicAndPartition(topic, partition) -> offset) line = reader.readLine() } if(offsets.size != expectedSize)