Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1244116) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -25,6 +25,7 @@ import kafka.common._ import kafka.api.OffsetRequest import java.util._ +import kafka.server.BrokerTopicStat private[log] object Log { val FileSuffix = ".kafka" @@ -207,6 +208,8 @@ numberOfMessages += 1; } + BrokerTopicStat.getBrokerTopicStat(getTopicName).recordMessages(numberOfMessages) + BrokerTopicStat.getBrokerAllTopicStat.recordMessages(numberOfMessages) logStats.recordAppendedMessages(numberOfMessages) // they are valid, insert them in the log Index: core/src/main/scala/kafka/server/KafkaRequestHandlers.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (revision 1244116) +++ core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (working copy) @@ -23,8 +23,8 @@ import kafka.message._ import kafka.api._ import kafka.common.ErrorMapping -import kafka.utils.SystemTime -import kafka.utils.Logging +import java.util.concurrent.atomic.AtomicLong +import kafka.utils._ /** * Logic to handle the various Kafka requests @@ -69,6 +69,8 @@ logManager.getOrCreateLog(request.topic, partition).append(request.messages) trace(request.messages.sizeInBytes + " bytes written to logs.") request.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum))) + BrokerTopicStat.getBrokerTopicStat(request.topic).recordBytes(request.messages.sizeInBytes) + BrokerTopicStat.getBrokerAllTopicStat.recordBytes(request.messages.sizeInBytes) } catch { case e => @@ -122,3 +124,42 @@ Some(response) } } + +trait BrokerTopicStatMBean { + def getMessages: Long + def getBytes: Long +} + +@threadsafe +class BrokerTopicStat extends BrokerTopicStatMBean { + private val numCumulatedMessages = new AtomicLong(0) + private val numCumulatedBytes = new AtomicLong(0) + + def getMessages: Long = numCumulatedMessages.get + + def recordMessages(nMessages: Int) = numCumulatedMessages.getAndAdd(nMessages) + + def getBytes: Long = numCumulatedBytes.get + + def recordBytes(nBytes: Long) = numCumulatedBytes.getAndAdd(nBytes) +} + +object BrokerTopicStat extends Logging { + private val stats = new Pool[String, BrokerTopicStat] + private val allTopicStat = new BrokerTopicStat + Utils.registerMBean(allTopicStat, "kafka:type=kafka.BrokerAllTopicStat") + + def getBrokerAllTopicStat(): BrokerTopicStat = allTopicStat + + def getBrokerTopicStat(topic: String): BrokerTopicStat = { + var stat = stats.get(topic) + if (stat == null) { + stat = new BrokerTopicStat + if (stats.putIfNotExists(topic, stat) == null) + Utils.registerMBean(stat, "kafka:type=kafka.BrokerTopicStat." + topic) + else + stat = stats.get(topic) + } + return stat + } +}