Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1244116) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -25,6 +25,7 @@ import kafka.common._ import kafka.api.OffsetRequest import java.util._ +import kafka.server.BrokerTopicStat private[log] object Log { val FileSuffix = ".kafka" @@ -207,6 +208,8 @@ numberOfMessages += 1; } + BrokerTopicStat.getBrokerTopicStat(getTopicName).recordMessagesIn(numberOfMessages) + BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages) logStats.recordAppendedMessages(numberOfMessages) // they are valid, insert them in the log Index: core/src/main/scala/kafka/server/KafkaRequestHandlers.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (revision 1244116) +++ core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (working copy) @@ -23,8 +23,8 @@ import kafka.message._ import kafka.api._ import kafka.common.ErrorMapping -import kafka.utils.SystemTime -import kafka.utils.Logging +import java.util.concurrent.atomic.AtomicLong +import kafka.utils._ /** * Logic to handle the various Kafka requests @@ -69,6 +69,8 @@ logManager.getOrCreateLog(request.topic, partition).append(request.messages) trace(request.messages.sizeInBytes + " bytes written to logs.") request.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum))) + BrokerTopicStat.getBrokerTopicStat(request.topic).recordBytesIn(request.messages.sizeInBytes) + BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(request.messages.sizeInBytes) } catch { case e => @@ -100,8 +102,11 @@ try { trace("Fetching log segment for topic, partition, offset, maxSize = " + fetchRequest) val log = logManager.getLog(fetchRequest.topic, fetchRequest.partition) - if (log != null) + if (log != null) { response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize)) + BrokerTopicStat.getBrokerTopicStat(fetchRequest.topic).recordBytesOut(response.messages.sizeInBytes) + BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(response.messages.sizeInBytes) + } else response = new MessageSetSend() } @@ -122,3 +127,48 @@ Some(response) } } + +trait BrokerTopicStatMBean { + def getMessagesIn: Long + def getBytesIn: Long + def getBytesOut: Long +} + +@threadsafe +class BrokerTopicStat extends BrokerTopicStatMBean { + private val numCumulatedMessagesIn = new AtomicLong(0) + private val numCumulatedBytesIn = new AtomicLong(0) + private val numCumulatedBytesOut = new AtomicLong(0) + + def getMessagesIn: Long = numCumulatedMessagesIn.get + + def recordMessagesIn(nMessages: Int) = numCumulatedMessagesIn.getAndAdd(nMessages) + + def getBytesIn: Long = numCumulatedBytesIn.get + + def recordBytesIn(nBytes: Long) = numCumulatedBytesIn.getAndAdd(nBytes) + + def getBytesOut: Long = numCumulatedBytesOut.get + + def recordBytesOut(nBytes: Long) = numCumulatedBytesOut.getAndAdd(nBytes) +} + +object BrokerTopicStat extends Logging { + private val stats = new Pool[String, BrokerTopicStat] + private val allTopicStat = new BrokerTopicStat + Utils.registerMBean(allTopicStat, "kafka:type=kafka.BrokerAllTopicStat") + + def getBrokerAllTopicStat(): BrokerTopicStat = allTopicStat + + def getBrokerTopicStat(topic: String): BrokerTopicStat = { + var stat = stats.get(topic) + if (stat == null) { + stat = new BrokerTopicStat + if (stats.putIfNotExists(topic, stat) == null) + Utils.registerMBean(stat, "kafka:type=kafka.BrokerTopicStat." + topic) + else + stat = stats.get(topic) + } + return stat + } +}