diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index a6ec970..60b0400 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -156,6 +156,10 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe onResponse(processor) } + /** Get the next request or block until specified time has elapsed */ + def receiveRequest(timeout: Long): RequestChannel.Request = + requestQueue.poll(timeout, TimeUnit.MILLISECONDS) + /** Get the next request or block until there is one */ def receiveRequest(): RequestChannel.Request = requestQueue.take() diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index dcfca3f..a108b01 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -24,7 +24,9 @@ import java.io._ import java.nio.channels._ import kafka.common.KafkaException +import kafka.metrics.KafkaMetricsGroup import kafka.utils._ +import com.yammer.metrics.core.Meter /** * An NIO socket server. The threading model is @@ -39,19 +41,24 @@ class SocketServer(val brokerId: Int, val maxQueuedRequests: Int, val sendBufferSize: Int, val recvBufferSize: Int, - val maxRequestSize: Int = Int.MaxValue) extends Logging { + val maxRequestSize: Int = Int.MaxValue) extends Logging with KafkaMetricsGroup { this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) @volatile private var acceptor: Acceptor = null val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests) + /* a meter to track the average free capacity of the network processors */ + private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", "percent", TimeUnit.NANOSECONDS) + /** * Start the socket server */ def startup() { for(i <- 0 until numProcessorThreads) { - processors(i) = new Processor(i, time, maxRequestSize, requestChannel) + processors(i) = new Processor(i, time, maxRequestSize, aggregateIdleMeter, + newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS), + numProcessorThreads, requestChannel) Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() } // register the processor threads for notification of responses @@ -219,9 +226,12 @@ private[kafka] class Acceptor(val host: String, val port: Int, private val proce private[kafka] class Processor(val id: Int, val time: Time, val maxRequestSize: Int, + val aggregateIdleMeter: Meter, + val idleMeter: Meter, + val totalProcessorThreads: Int, val requestChannel: RequestChannel) extends AbstractServerThread { - private val newConnections = new ConcurrentLinkedQueue[SocketChannel](); + private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() override def run() { startupComplete() @@ -230,9 +240,15 @@ private[kafka] class Processor(val id: Int, configureNewConnections() // register any new responses for writing processNewResponses() - val startSelectTime = SystemTime.milliseconds + val startSelectTime = SystemTime.nanoseconds val ready = selector.select(300) - trace("Processor id " + id + " selection time = " + (SystemTime.milliseconds - startSelectTime) + " ms") + val idleTime = SystemTime.nanoseconds - startSelectTime + idleMeter.mark(idleTime) + // to record the average idle percent, we use a single meter which + // mark all the threads idle time, but each divided by the number of threads + aggregateIdleMeter.mark(idleTime / totalProcessorThreads) + + trace("Processor id " + id + " selection time = " + idleTime + " ms") if(ready > 0) { val keys = selector.selectedKeys() val iter = keys.iterator() diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 871212b..7d4c705 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -21,17 +21,32 @@ import kafka.network._ import kafka.utils._ import kafka.metrics.KafkaMetricsGroup import java.util.concurrent.TimeUnit +import com.yammer.metrics.core.Meter /** * A thread that answers kafka requests. */ -class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging { +class KafkaRequestHandler(id: Int, + brokerId: Int, + val aggregateIdleMeter: Meter, + val totalProcessorThreads: Int, + val requestChannel: RequestChannel, + apis: KafkaApis) extends Runnable with Logging { this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], " - def run() { + def run() { while(true) { try { - val req = requestChannel.receiveRequest() + var req : RequestChannel.Request = null + while (req == null) { + // to record the average idle percent, we use a single meter which + // mark all the threads idle time, but each divided by the number of threads + val startSelectTime = SystemTime.nanoseconds + req = requestChannel.receiveRequest(300) + val idleTime = SystemTime.nanoseconds - startSelectTime + aggregateIdleMeter.mark(idleTime / totalProcessorThreads) + } + if(req eq RequestChannel.AllDone) { debug("Kafka request handler %d on broker %d received shut down command".format( id, brokerId)) @@ -52,12 +67,16 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: KafkaApis, - numThreads: Int) extends Logging { + numThreads: Int) extends Logging with KafkaMetricsGroup { + + /* a meter to track the average free capacity of the request handlers */ + private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS) + this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], " val threads = new Array[Thread](numThreads) val runnables = new Array[KafkaRequestHandler](numThreads) for(i <- 0 until numThreads) { - runnables(i) = new KafkaRequestHandler(i, brokerId, requestChannel, apis) + runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis) threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i)) threads(i).start() }