From 23b5e958bc04f01e808e75a80dab6802879ed5a5 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 6 Oct 2014 15:20:51 -0700 Subject: [PATCH] KAFKA-1641.v1 --- core/src/main/scala/kafka/log/LogCleanerManager.scala | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index e8ced6a..5482753 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -77,8 +77,20 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To val lastClean = allCleanerCheckpoints() val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress - .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each - lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset))) + .map(l => { // create a LogToClean instance for each + val logStartOffset = l._2.logSegments.head.baseOffset + var firstDirtyOffset = lastClean.getOrElse(l._1, logStartOffset) + // if the log segments are abnormally truncated and hence the checkpointed offset + // is no longer valid, reset to the log starting offset and log the error event + if (firstDirtyOffset < logStartOffset) { + error("The checkpointed offset %d is smaller than the log starting offset %d, " + + "resetting to the starting offset.".format(firstDirtyOffset, logStartOffset)) + + firstDirtyOffset = l._2.logSegments.head.baseOffset + } + + LogToClean(l._1, l._2, lastClean.getOrElse(l._1, firstDirtyOffset)) + }) .filter(l => l.totalBytes > 0) // skip any empty logs this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0 val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio -- 1.7.12.4