commit 5c30d17e9d7fc85c2a8336c8a82da03457554de9 Author: Alexis Midon Date: Wed Aug 13 17:56:46 2014 -0700 KAFKA-1594 add the total number of in-server-socket-channel responses as a metric diff --git core/src/main/scala/kafka/network/SocketServer.scala core/src/main/scala/kafka/network/SocketServer.scala index 9693bc0..c4f1f77 100644 --- core/src/main/scala/kafka/network/SocketServer.scala +++ core/src/main/scala/kafka/network/SocketServer.scala @@ -17,6 +17,7 @@ package kafka.network +import java.util import java.util.concurrent._ import java.util.concurrent.atomic._ import java.net._ @@ -28,7 +29,7 @@ import scala.collection._ import kafka.common.KafkaException import kafka.metrics.KafkaMetricsGroup import kafka.utils._ -import com.yammer.metrics.core.Meter +import com.yammer.metrics.core.{Gauge, Meter} /** * An NIO socket server. The threading model is @@ -71,6 +72,11 @@ class SocketServer(val brokerId: Int, quotas) Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start() } + + newGauge("ResponsesBeingSent", new Gauge[Int] { + def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) } + }) + // register the processor threads for notification of responses requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) @@ -170,7 +176,17 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ close(key) } } - + + def countInterestOps(ops: Int): Int = { + var count = 0 + val it = this.selector.keys().iterator() + while (it.hasNext) { + if ((it.next().interestOps() & ops) != 0) { + count += 1 + } + } + count + } } /**