From 67d7ee3ca03f10bbad1c8eddb4703ea6df9d6cd7 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 6 Oct 2014 15:20:51 -0700 Subject: [PATCH 1/2] KAFKA-1641.v1 --- core/src/main/scala/kafka/log/LogCleanerManager.scala | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index e8ced6a..5482753 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -77,8 +77,20 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To val lastClean = allCleanerCheckpoints() val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress - .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each - lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset))) + .map(l => { // create a LogToClean instance for each + val logStartOffset = l._2.logSegments.head.baseOffset + var firstDirtyOffset = lastClean.getOrElse(l._1, logStartOffset) + // if the log segments are abnormally truncated and hence the checkpointed offset + // is no longer valid, reset to the log starting offset and log the error event + if (firstDirtyOffset < logStartOffset) { + error("The checkpointed offset %d is smaller than the log starting offset %d, " + + "resetting to the starting offset.".format(firstDirtyOffset, logStartOffset)) + + firstDirtyOffset = l._2.logSegments.head.baseOffset + } + + LogToClean(l._1, l._2, lastClean.getOrElse(l._1, firstDirtyOffset)) + }) .filter(l => l.totalBytes > 0) // skip any empty logs this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0 val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio -- 1.7.12.4 From 3cff929fce3621d050a6fad915e20b1c1b464b03 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 9 Oct 2014 13:03:24 -0700 Subject: [PATCH 2/2] address Joel's comments --- .../main/scala/kafka/log/LogCleanerManager.scala | 43 +++++++++++++--------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 5482753..2171766 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -75,25 +75,32 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To def grabFilthiestLog(): Option[LogToClean] = { inLock(lock) { val lastClean = allCleanerCheckpoints() - val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe - .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress - .map(l => { // create a LogToClean instance for each - val logStartOffset = l._2.logSegments.head.baseOffset - var firstDirtyOffset = lastClean.getOrElse(l._1, logStartOffset) - // if the log segments are abnormally truncated and hence the checkpointed offset - // is no longer valid, reset to the log starting offset and log the error event - if (firstDirtyOffset < logStartOffset) { - error("The checkpointed offset %d is smaller than the log starting offset %d, " + - "resetting to the starting offset.".format(firstDirtyOffset, logStartOffset)) - - firstDirtyOffset = l._2.logSegments.head.baseOffset - } - - LogToClean(l._1, l._2, lastClean.getOrElse(l._1, firstDirtyOffset)) - }) - .filter(l => l.totalBytes > 0) // skip any empty logs + val dirtyLogs = logs.filter { + case (topicAndPartition, log) => log.config.compact // skip any logs marked for delete rather than dedupe + }.filterNot { + case (topicAndPartition, log) => inProgress.contains(topicAndPartition) // skip any logs already in-progress + }.map { + case (topicAndPartition, log) => // create a LogToClean instance for each + // if the log segments are abnormally truncated and hence the checkpointed offset + // is no longer valid, reset to the log starting offset and log the error event + val logStartOffset = log.logSegments.head.baseOffset + val firstDirtyOffset = { + val offset = lastClean.getOrElse(topicAndPartition, logStartOffset) + if (offset < logStartOffset) { + error("The checkpointed offset %d is smaller than the log starting offset %d, " + + "resetting to the starting offset.".format(offset, logStartOffset)) + + logStartOffset + } else { + offset + } + } + LogToClean(topicAndPartition, log, firstDirtyOffset) + }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs + this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0 - val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio + // and must meet the minimum threshold for dirty byte ratio + val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio) if(cleanableLogs.isEmpty) { None } else { -- 1.7.12.4