From c4082a3264c36ab6c40937999ee0293c419a73fd Mon Sep 17 00:00:00 2001
From: Pierre-Yves Ritschard <pyr@spootnik.org>
Date: Tue, 24 Jan 2012 18:02:08 +0100
Subject: [PATCH] Provide a patch for KAFKA-251

---
 .../consumer/ZookeeperConsumerConnector.scala      |   65 +++++++++++++++++++-
 1 files changed, 64 insertions(+), 1 deletions(-)

diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 7b9804f..243ea5f 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -81,6 +81,34 @@ trait ZookeeperConsumerConnectorMBean {
   def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long
 }
 
+
+trait ZookeeperConsumerTopicPartitionMBean {
+  def getGroupId: String
+  def getPartition: String
+  def getTopic: String
+  def getFetchOffset: Long
+  def getConsumeOffset: Long
+  def getConsumeLag: Long
+}
+
+private[kafka] class ZookeeperConsumerTopicPartition(val topic: String,
+						     val info: PartitionTopicInfo,
+						     val groupId: String)
+  extends ZookeeperConsumerTopicPartitionMBean {
+
+    def getGroupId: String = groupId
+
+    def getTopic: String = topic
+
+    def getPartition: String = info.partition.name
+
+    def getFetchOffset: Long = info.getFetchOffset
+
+    def getConsumeOffset: Long = info.getConsumeOffset
+
+    def getConsumeLag: Long = info.getFetchOffset - info.getConsumeOffset
+}
+
 private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                 val enableFetcher: Boolean) // for testing only
   extends ConsumerConnector with ZookeeperConsumerConnectorMBean with Logging {
@@ -427,7 +455,40 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       oldPartitionsPerTopicMap.clear
     }
 
+
+    private def getMBeanName(topic: String, partition: String, groupId: String) : String = {
+      String.format("kafka:type=kafka.ConsumerPartitionTopicStats,topic=%s,partition=%s,groupid=%s",
+                     topic,
+                     partition,
+                     groupId)
+    }
+
+    def registerMBeans() {
+      for ((topic, infos) <- topicRegistry) {
+        for (partinfo <- infos.values) {
+	  val mbeanName = getMBeanName(topic, partinfo.partition.name, config.groupId)
+          val consumerTopicMBean = new ZookeeperConsumerTopicPartition(topic, partinfo, config.groupId)
+
+	  info("registering MBean + " mbeanName)
+          Utils.registerMBean(consumerTopicMBean, mbeanName)
+        }
+      }
+      info("done")
+    }
+
+    def unregisterMBeans() {
+      for ((topic, infos) <- topicRegistry) {
+	for (partinfo <- infos.values) {
+	  val mbeanName = getMBeanName(topic, partinfo.partition.name, config.groupId)
+
+	  info("unregistering MBean")
+	  Utils.unregisterMBean(mbeanName)
+        }
+      }
+    }
+
     def syncedRebalance() {
+      unregisterMBeans()
       rebalanceLock synchronized {
         for (i <- 0 until config.maxRebalanceRetries) {
           info("begin rebalancing consumer " + consumerIdString + " try #" + i)
@@ -443,8 +504,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
               info("exception during rebalance ", e)
           }
           info("end rebalancing consumer " + consumerIdString + " try #" + i)
-          if (done)
+          if (done) {
+            registerMBeans()
             return
+          }
           // release all partitions, reset state and retry
           releasePartitionOwnership()
           Thread.sleep(config.rebalanceBackoffMs)
-- 
1.7.8.4

