diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 683d722..e8ced6a 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -58,7 +58,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To private val pausedCleaningCond = lock.newCondition() /* a gauge for tracking the cleanable ratio of the dirtiest log */ - private var dirtiestLogCleanableRatio = 0.0 + @volatile private var dirtiestLogCleanableRatio = 0.0 newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt }) /** @@ -79,9 +79,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset))) - .filter(l => l.totalBytes > 0) // skip any empty logs - if(!dirtyLogs.isEmpty) - this.dirtiestLogCleanableRatio = dirtyLogs.max.cleanableRatio + .filter(l => l.totalBytes > 0) // skip any empty logs + this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0 val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio if(cleanableLogs.isEmpty) { None @@ -126,7 +125,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To case LogCleaningInProgress => inProgress.put(topicAndPartition, LogCleaningAborted) case s => - throw new IllegalStateException(("Partiiton %s can't be aborted and pasued since it's in %s state").format(topicAndPartition, s)) + throw new IllegalStateException("Compaction for partition %s cannot be aborted and paused since it is in %s state." + .format(topicAndPartition, s)) } } while (!isCleaningInState(topicAndPartition, LogCleaningPaused)) @@ -142,17 +142,19 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To inLock(lock) { inProgress.get(topicAndPartition) match { case None => - throw new IllegalStateException(("Partiiton %s can't be resumed since it's never paused").format(topicAndPartition)) + throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is not paused." + .format(topicAndPartition)) case Some(state) => state match { case LogCleaningPaused => inProgress.remove(topicAndPartition) case s => - throw new IllegalStateException(("Partiiton %s can't be resumed since it's in %s state").format(topicAndPartition, s)) + throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is in %s state." + .format(topicAndPartition, s)) } } } - info("The cleaning for partition %s is resumed".format(topicAndPartition)) + info("Compaction for partition %s is resumed".format(topicAndPartition)) } /** @@ -194,7 +196,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To inProgress.put(topicAndPartition, LogCleaningPaused) pausedCleaningCond.signalAll() case s => - throw new IllegalStateException(("In-progress partiiton %s can't be in %s state").format(topicAndPartition, s)) + throw new IllegalStateException("In-progress partition %s cannot be in %s state.".format(topicAndPartition, s)) } } }