diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index a6ec970..60b0400 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -156,6 +156,10 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe onResponse(processor) } + /** Get the next request or block until specified time has elapsed */ + def receiveRequest(timeout: Long): RequestChannel.Request = + requestQueue.poll(timeout, TimeUnit.MILLISECONDS) + /** Get the next request or block until there is one */ def receiveRequest(): RequestChannel.Request = requestQueue.take() diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index dcfca3f..6890da6 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -25,6 +25,8 @@ import java.nio.channels._ import kafka.common.KafkaException import kafka.utils._ +import com.yammer.metrics.core.Gauge +import kafka.metrics.KafkaMetricsGroup /** * An NIO socket server. The threading model is @@ -39,13 +41,35 @@ class SocketServer(val brokerId: Int, val maxQueuedRequests: Int, val sendBufferSize: Int, val recvBufferSize: Int, - val maxRequestSize: Int = Int.MaxValue) extends Logging { + val maxRequestSize: Int = Int.MaxValue) extends Logging with KafkaMetricsGroup { this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) @volatile private var acceptor: Acceptor = null val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests) + /* a metric to track the average utilization of the network processor capacity */ + newGauge("network-threads-utilization-avg", + new Gauge[Int] { + def value: Int = { + (processors.foldLeft(0.0d)((sum, processor) => { + processor.utilAvgRead = true + sum + processor.utilization + }) / numProcessorThreads).toInt * 100 + } + }) + + /* a metric to track the maxium utilization of the network processor capacity */ + newGauge("network-threads-utilization-max", + new Gauge[Int] { + def value: Int = { + processors.foldLeft(0.0d)((max, processor) => { + processor.utilMaxRead = true + Math.max(max, processor.utilization) + }).toInt * 100 + } + }) + /** * Start the socket server */ @@ -223,16 +247,33 @@ private[kafka] class Processor(val id: Int, private val newConnections = new ConcurrentLinkedQueue[SocketChannel](); + @volatile var utilization = 0.0d + private var clockTimeInWindow = 0L + private var idleTimeInWindow = 0L + var utilAvgRead = false + var utilMaxRead = false + override def run() { startupComplete() while(isRunning) { + // reset utilization metric if needed + if (utilAvgRead && utilMaxRead) { + utilAvgRead = false + utilAvgRead = false + clockTimeInWindow = 0L + idleTimeInWindow = 0L + } + + val startClockTime = SystemTime.milliseconds // setup any new connections that have been queued up configureNewConnections() // register any new responses for writing processNewResponses() val startSelectTime = SystemTime.milliseconds val ready = selector.select(300) - trace("Processor id " + id + " selection time = " + (SystemTime.milliseconds - startSelectTime) + " ms") + val idleTime = SystemTime.milliseconds - startSelectTime + idleTimeInWindow += idleTime + trace("Processor id " + id + " selection time = " + idleTime + " ms") if(ready > 0) { val keys = selector.selectedKeys() val iter = keys.iterator() @@ -263,6 +304,9 @@ private[kafka] class Processor(val id: Int, } } } + val clockTime = SystemTime.milliseconds - startClockTime + clockTimeInWindow += clockTime + utilization = 1.0d - idleTimeInWindow.toDouble / clockTimeInWindow } debug("Closing selector.") closeAll() diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 871212b..3078e6a 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -21,6 +21,7 @@ import kafka.network._ import kafka.utils._ import kafka.metrics.KafkaMetricsGroup import java.util.concurrent.TimeUnit +import com.yammer.metrics.core.Gauge /** * A thread that answers kafka requests. @@ -28,10 +29,30 @@ import java.util.concurrent.TimeUnit class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging { this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], " + @volatile var utilization = 0.0d + private var clockTimeInWindow = 0L + private var idleTimeInWindow = 0L + var utilRead = false + def run() { while(true) { try { - val req = requestChannel.receiveRequest() + // reset utilization metric if needed + if (utilRead) { + utilRead = false + clockTimeInWindow = 0L + idleTimeInWindow = 0L + } + + val startClockTime = SystemTime.milliseconds + + var req : RequestChannel.Request = null + while (req == null) { + req = requestChannel.receiveRequest(300) + } + val idleTime = SystemTime.milliseconds - startClockTime + idleTimeInWindow += idleTime + if(req eq RequestChannel.AllDone) { debug("Kafka request handler %d on broker %d received shut down command".format( id, brokerId)) @@ -40,6 +61,10 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha req.requestDequeueTimeMs = SystemTime.milliseconds trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req)) apis.handle(req) + + val clockTime = SystemTime.milliseconds - startClockTime + clockTimeInWindow += clockTime + utilization = 1.0d - idleTimeInWindow.toDouble / clockTimeInWindow } catch { case e: Throwable => error("Exception when handling request", e) } @@ -52,7 +77,7 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: KafkaApis, - numThreads: Int) extends Logging { + numThreads: Int) extends Logging with KafkaMetricsGroup { this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], " val threads = new Array[Thread](numThreads) val runnables = new Array[KafkaRequestHandler](numThreads) @@ -62,6 +87,17 @@ class KafkaRequestHandlerPool(val brokerId: Int, threads(i).start() } + /* a metric to track the utilization of the handler threads capacity */ + newGauge("handler-threads-utilization", + new Gauge[Int] { + def value: Int = { + (runnables.foldLeft(0.0d)((sum, handler) => { + handler.utilRead = true + sum + handler.utilization + }) / numThreads).toInt * 100 + } + }) + def shutdown() { info("shutting down") for(handler <- runnables)