Index: core/src/main/scala/kafka/consumer/KafkaMessageStream.scala =================================================================== --- core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (revision 1178198) +++ core/src/main/scala/kafka/consumer/KafkaMessageStream.scala (working copy) @@ -26,14 +26,15 @@ * All calls to elements should produce the same thread-safe iterator? Should have a seperate thread * that feeds messages into a blocking queue for processing. */ -class KafkaMessageStream[T](private val queue: BlockingQueue[FetchedDataChunk], +class KafkaMessageStream[T](val topic: String, + private val queue: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int, private val decoder: Decoder[T]) extends Iterable[T] with java.lang.Iterable[T]{ private val logger = Logger.getLogger(getClass()) private val iter: ConsumerIterator[T] = - new ConsumerIterator[T](queue, consumerTimeoutMs, decoder) + new ConsumerIterator[T](topic, queue, consumerTimeoutMs, decoder) /** * Create an iterator over messages in the stream. Index: core/src/main/scala/kafka/consumer/ConsumerIterator.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerIterator.scala (revision 1178198) +++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala (working copy) @@ -29,7 +29,8 @@ * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown * */ -class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk], +class ConsumerIterator[T](private val topic: String, + private val channel: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int, private val decoder: Decoder[T]) extends IteratorTemplate[T] { @@ -47,6 +48,7 @@ currentTopicInfo.resetConsumeOffset(consumedOffset) if(logger.isTraceEnabled) logger.trace("Setting consumed offset to %d".format(consumedOffset)) + ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1) decodedMessage } Index: core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala =================================================================== --- core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (revision 0) +++ core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (revision 0) @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.consumer + +import java.util.concurrent.atomic.AtomicLong +import org.apache.log4j.Logger +import kafka.utils.{Pool, Utils, threadsafe} + +trait ConsumerTopicStatMBean { + def getMessagesPerTopic: Long +} + +@threadsafe +class ConsumerTopicStat extends ConsumerTopicStatMBean { + private val numCumulatedMessagesPerTopic = new AtomicLong(0) + + def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get + + def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages) +} + +object ConsumerTopicStat { + private val logger = Logger.getLogger(getClass()) + private val stats = new Pool[String, ConsumerTopicStat] + + def getConsumerTopicStat(topic: String): ConsumerTopicStat = { + var stat = stats.get(topic) + if (stat == null) { + stat = new ConsumerTopicStat + if (stats.putIfNotExists(topic, stat) == null) + Utils.swallow(logger.warn, Utils.registerMBean(stat, "kafka:type=kafka.ConsumerTopicStat." + topic)) + else + stat = stats.get(topic) + } + return stat + } +} Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (revision 1178198) +++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -186,7 +186,7 @@ for (threadId <- threadIdSet) { val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks) queues.put((topic, threadId), stream) - streamList ::= new KafkaMessageStream[T](stream, config.consumerTimeoutMs, decoder) + streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs, decoder) } ret += (topic -> streamList) logger.debug("adding topic " + topic + " and stream to map..")