diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b3ab522..46df8d9 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -75,6 +75,9 @@ class Log(val dir: File, newGauge(name + "-" + "LogEndOffset", new Gauge[Long] { def value = logEndOffset }) + + newGauge(name + "-" + "Size", + new Gauge[Long] {def value = size}) /** The name of this log */ def name = dir.getName() diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 312204c..b9ffe00 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -19,12 +19,15 @@ package kafka.log import scala.collection._ import scala.math +import java.util.concurrent.TimeUnit import java.nio._ import java.util.Date import java.io.File import kafka.common._ import kafka.message._ import kafka.utils._ +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge import java.lang.IllegalStateException /** @@ -63,7 +66,8 @@ import java.lang.IllegalStateException class LogCleaner(val config: CleanerConfig, val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log], - time: Time = SystemTime) extends Logging { + time: Time = SystemTime) extends Logging with KafkaMetricsGroup { + /* for managing the state of partitions being cleaned. */ private val cleanerManager = new LogCleanerManager(logDirs, logs); @@ -71,11 +75,33 @@ class LogCleaner(val config: CleanerConfig, private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, checkIntervalMs = 300, throttleDown = true, + "cleaner-io", + "bytes", time = time) /* the threads */ private val cleaners = (0 until config.numThreads).map(new CleanerThread(_)) + /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */ + newGauge("max-buffer-utilization-percent", + new Gauge[Int] { + def value: Int = cleaners.map(_.lastStats).map(100 * _.bufferUtilization).max.toInt + }) + /* a metric to track the recopy rate of each thread's last cleaning */ + newGauge("cleaner-recopy-percent", + new Gauge[Int] { + def value: Int = { + val stats = cleaners.map(_.lastStats) + val recopyRate = stats.map(_.bytesWritten).sum.toDouble / math.max(stats.map(_.bytesRead).sum, 1) + (100 * recopyRate).toInt + } + }) + /* a metric to track the maximum cleaning time for the last cleaning from each thread */ + newGauge("max-clean-time-secs", + new Gauge[Int] { + def value: Int = cleaners.map(_.lastStats).map(_.elapsedSecs).max.toInt + }) + /** * Start the background cleaning */ @@ -147,6 +173,8 @@ class LogCleaner(val config: CleanerConfig, time = time, checkDone = checkDone) + @volatile var lastStats: CleanerStats = new CleanerStats() + private def checkDone(topicAndPartition: TopicAndPartition) { if (!isRunning.get()) throw new ThreadShutdownException @@ -173,7 +201,7 @@ class LogCleaner(val config: CleanerConfig, var endOffset = cleanable.firstDirtyOffset try { endOffset = cleaner.clean(cleanable) - logStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleaner.stats) + recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleaner.stats) } catch { case pe: LogCleaningAbortedException => // task can be aborted, let it go. } finally { @@ -185,7 +213,8 @@ class LogCleaner(val config: CleanerConfig, /** * Log out statistics on a single run of the cleaner. */ - def logStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) { + def recordStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) { + this.lastStats = stats def mb(bytes: Double) = bytes / (1024*1024) val message = "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + @@ -196,6 +225,7 @@ class LogCleaner(val config: CleanerConfig, stats.elapsedIndexSecs, mb(stats.mapBytesRead)/stats.elapsedIndexSecs, 100 * stats.elapsedIndexSecs.toDouble/stats.elapsedSecs) + + "\tBuffer utilization: %.1f%%%n".format(100 * stats.bufferUtilization) + "\tCleaned %,.1f MB in %.1f seconds (%,.1f Mb/sec, %.1f%% of total time)%n".format(mb(stats.bytesRead), stats.elapsedSecs - stats.elapsedIndexSecs, mb(stats.bytesRead)/(stats.elapsedSecs - stats.elapsedIndexSecs), 100 * (stats.elapsedSecs - stats.elapsedIndexSecs).toDouble/stats.elapsedSecs) + @@ -218,7 +248,7 @@ class LogCleaner(val config: CleanerConfig, * @param time The time instance */ private[log] class Cleaner(val id: Int, - offsetMap: OffsetMap, + val offsetMap: OffsetMap, ioBufferSize: Int, maxIoBufferSize: Int, dupBufferLoadFactor: Double, @@ -269,6 +299,9 @@ private[log] class Cleaner(val id: Int, info("Cleaning log %s (discarding tombstones prior to %s)...".format(log.name, new Date(deleteHorizonMs))) for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) cleanSegments(log, group, offsetMap, deleteHorizonMs) + + // record buffer utilization + stats.bufferUtilization = offsetMap.utilization stats.allDone() endOffset @@ -504,6 +537,7 @@ private[log] class Cleaner(val id: Int, */ private case class CleanerStats(time: Time = SystemTime) { var startTime, mapCompleteTime, endTime, bytesRead, bytesWritten, mapBytesRead, mapMessagesRead, messagesRead, messagesWritten = 0L + var bufferUtilization = 0.0d clear() def readMessage(size: Int) { @@ -543,6 +577,7 @@ private case class CleanerStats(time: Time = SystemTime) { mapMessagesRead = 0L messagesRead = 0L messagesWritten = 0L + bufferUtilization = 0.0d } } diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 79e9d55..683d722 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -18,6 +18,8 @@ package kafka.log import java.io.File +import kafka.metrics.KafkaMetricsGroup +import com.yammer.metrics.core.Gauge import kafka.utils.{Logging, Pool} import kafka.server.OffsetCheckpoint import collection.mutable @@ -39,7 +41,7 @@ private[log] case object LogCleaningPaused extends LogCleaningState * While a partition is in the LogCleaningPaused state, it won't be scheduled for cleaning again, until cleaning is * requested to be resumed. */ -private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging { +private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup { override val loggerName = classOf[LogCleaner].getName @@ -51,8 +53,13 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To /* a global lock used to control all access to the in-progress set and the offset checkpoints */ private val lock = new ReentrantLock + /* for coordinating the pausing and the cleaning of a partition */ private val pausedCleaningCond = lock.newCondition() + + /* a gauge for tracking the cleanable ratio of the dirtiest log */ + private var dirtiestLogCleanableRatio = 0.0 + newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt }) /** * @return the position processed for all logs. @@ -68,16 +75,18 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To def grabFilthiestLog(): Option[LogToClean] = { inLock(lock) { val lastClean = allCleanerCheckpoints() - val cleanableLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe - .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress - .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each - lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset))) - val dirtyLogs = cleanableLogs.filter(l => l.totalBytes > 0) // must have some bytes - .filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio - if(dirtyLogs.isEmpty) { + val dirtyLogs = logs.filter(l => l._2.config.compact) // skip any logs marked for delete rather than dedupe + .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress + .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each + lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset))) + .filter(l => l.totalBytes > 0) // skip any empty logs + if(!dirtyLogs.isEmpty) + this.dirtiestLogCleanableRatio = dirtyLogs.max.cleanableRatio + val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio + if(cleanableLogs.isEmpty) { None } else { - val filthiest = dirtyLogs.max + val filthiest = cleanableLogs.max inProgress.put(filthiest.topicPartition, LogCleaningInProgress) Some(filthiest) } diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala index c6c3c75..d1a144d 100644 --- a/core/src/main/scala/kafka/utils/Throttler.scala +++ b/core/src/main/scala/kafka/utils/Throttler.scala @@ -17,6 +17,8 @@ package kafka.utils; +import kafka.metrics.KafkaMetricsGroup +import java.util.concurrent.TimeUnit import java.util.Random import scala.math._ @@ -33,14 +35,18 @@ import scala.math._ @threadsafe class Throttler(val desiredRatePerSec: Double, val checkIntervalMs: Long = 100L, - val throttleDown: Boolean = true, - val time: Time = SystemTime) extends Logging { + val throttleDown: Boolean = true, + metricName: String = "throttler", + units: String = "entries", + val time: Time = SystemTime) extends Logging with KafkaMetricsGroup { private val lock = new Object + private val meter = newMeter(metricName, units, TimeUnit.SECONDS) private var periodStartNs: Long = time.nanoseconds private var observedSoFar: Double = 0.0 def maybeThrottle(observed: Double) { + meter.mark(observed.toLong) lock synchronized { observedSoFar += observed val now = time.nanoseconds @@ -72,7 +78,7 @@ object Throttler { def main(args: Array[String]) { val rand = new Random() - val throttler = new Throttler(100000, 100, true, SystemTime) + val throttler = new Throttler(100000, 100, true, time = SystemTime) val interval = 30000 var start = System.currentTimeMillis var total = 0