Index: core/src/main/scala/kafka/consumer/ConsumerIterator.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerIterator.scala (revision 1346922) +++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala (working copy) @@ -47,6 +47,7 @@ val topic = currentTopicInfo.topic trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1) + ConsumerTopicStat.getConsumerAllTopicStat().recordMessagesPerTopic(1) item } Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala =================================================================== --- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (revision 1346922) +++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (working copy) @@ -61,6 +61,8 @@ chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset)) val newOffset = fetchedOffset.addAndGet(size) debug("updated fetch offset of ( %s ) to %d".format(this, newOffset)) + ConsumerTopicStat.getConsumerTopicStat(topic).recordBytesPerTopic(size) + ConsumerTopicStat.getConsumerAllTopicStat().recordBytesPerTopic(size) } size } Index: core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (revision 1346922) +++ core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (working copy) @@ -22,20 +22,30 @@ trait ConsumerTopicStatMBean { def getMessagesPerTopic: Long + def getBytesPerTopic: Long } @threadsafe class ConsumerTopicStat extends ConsumerTopicStatMBean { private val numCumulatedMessagesPerTopic = new AtomicLong(0) + private val numCumulatedBytesPerTopic = new AtomicLong(0) def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages) + + def getBytesPerTopic: Long = numCumulatedBytesPerTopic.get + + def recordBytesPerTopic(nBytes: Long) = numCumulatedBytesPerTopic.getAndAdd(nBytes) } object ConsumerTopicStat extends Logging { private val stats = new Pool[String, ConsumerTopicStat] + private val allTopicStat = new ConsumerTopicStat + Utils.registerMBean(allTopicStat, "kafka:type=kafka.ConsumerAllTopicStat") + def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat + def getConsumerTopicStat(topic: String): ConsumerTopicStat = { var stat = stats.get(topic) if (stat == null) {