Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(revision 66024af)
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(revision c4b2647101ab857dda4cb831863dd37e5cb4df55)
@@ -33,8 +33,8 @@
 import kafka.common.{ConsumerRebalanceFailedException, InvalidConfigException}
 import java.lang.IllegalStateException
 import kafka.utils.ZkUtils._
+import kafka.javaapi.consumer.ConsumerListener
 
-
 /**
  * This class handles the consumers interaction with zookeeper
  *
@@ -104,6 +104,8 @@
 
   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
 
+  private var consumerListener: ConsumerListener = null;
+
   val consumerIdString = {
     var consumerUuid : String = null
     config.consumerId match {
@@ -128,6 +130,13 @@
 
   def this(config: ConsumerConfig) = this(config, true)
 
+  /**
+   * @see ZookeeperConsumerConnector#setListener
+   */
+  def setListener(listener: ConsumerListener) {
+    consumerListener = listener
+  }
+
   def createMessageStreams[T](topicCountMap: Map[String,Int],
                               decoder: Decoder[T])
       : Map[String,List[KafkaStream[T]]] = {
@@ -458,8 +467,11 @@
           }
           info("end rebalancing consumer " + consumerIdString + " try #" + i)
           if (done) {
+            if (consumerListener != null) {
+              consumerListener.afterRebalance(true);
+            }
             return
-          }else {
+          } else {
               /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
                * clear the cache */
               info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
@@ -467,6 +479,9 @@
           // stop all fetchers and clear all the queues to avoid data duplication
           closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
           Thread.sleep(config.rebalanceBackoffMs)
+        }
+        if (consumerListener != null) {
+          consumerListener.afterRebalance(false);
         }
       }
 
Index: core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java	(revision 66024af)
+++ core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java	(revision c4b2647101ab857dda4cb831863dd37e5cb4df55)
@@ -66,4 +66,10 @@
    *  Shut down the connector
    */
   public void shutdown();
+
+  /**
+   * Register a listener to get notified about some internals like
+   * rebalancing.
+   */
+  public void setListener(ConsumerListener listener);
 }
Index: core/src/main/scala/kafka/javaapi/consumer/ConsumerListener.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/javaapi/consumer/ConsumerListener.java	(revision c4b2647101ab857dda4cb831863dd37e5cb4df55)
+++ core/src/main/scala/kafka/javaapi/consumer/ConsumerListener.java	(revision c4b2647101ab857dda4cb831863dd37e5cb4df55)
@@ -0,0 +1,14 @@
+package kafka.javaapi.consumer;
+
+public class ConsumerListener {
+
+  /**
+   * This method is called whenever a rebalance of topic/partition
+   * assignments has been done by {@link ZookeeperConsumerConnector}.
+   *
+   * @param success if true balancing succeeded, false if not.
+   */
+  public void afterRebalance(boolean success) {
+  }
+
+}
Index: core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala	(revision 66024af)
+++ core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala	(revision c4b2647101ab857dda4cb831863dd37e5cb4df55)
@@ -85,6 +85,10 @@
     ret
   }
 
+  def setListener(listener: ConsumerListener) {
+    underlying.setListener(listener);
+  }
+
   def createMessageStreams(
         topicCountMap: java.util.Map[String,java.lang.Integer])
       : java.util.Map[String,java.util.List[KafkaStream[Message]]] =
Index: core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala	(revision 66024af)
+++ core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala	(revision c4b2647101ab857dda4cb831863dd37e5cb4df55)
@@ -23,6 +23,9 @@
 import java.lang.Thread
 import org.scalatest.junit.JUnit3Suite
 import kafka.utils.{TestUtils, ZkUtils, ZKGroupTopicDirs, TestZKUtils}
+import kafka.javaapi.consumer.ConsumerListener
+import collection.Map
+import kafka.serializer.Decoder
 
 class ZKLoadBalanceTest extends JUnit3Suite with ZooKeeperTestHarness {
   val zkConnect = TestZKUtils.zookeeperConnect
@@ -34,7 +37,6 @@
 
   override def setUp() {
     super.setUp()
-
     dirs = new ZKGroupTopicDirs(group, topic)
   }
 
@@ -45,29 +47,17 @@
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, firstConsumer))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, false)
     zkConsumerConnector1.createMessageStreams(Map(topic -> 1))
+		checkPartitionOwnerRegistry(List(("400-0", "group1_consumer1-0")))
 
-    {
-      // check Partition Owner Registry
-      val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
-      val expected_1 = List( ("400-0", "group1_consumer1-0") )
-      checkSetEqual(actual_1, expected_1)
-    }
-
     // add a second consumer
     val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, secondConsumer))
-    val ZKConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, false)
-    ZKConsumerConnector2.createMessageStreams(Map(topic -> 1))
+    val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, false)
+    zkConsumerConnector2.createMessageStreams(Map(topic -> 1))
     // wait a bit to make sure rebalancing logic is triggered
     Thread.sleep(200)
+		checkPartitionOwnerRegistry(List(("400-0", "group1_consumer1-0")))
 
-    {
+		{
-      // check Partition Owner Registry
-      val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
-      val expected_2 = List( ("400-0", "group1_consumer1-0") )
-      checkSetEqual(actual_2, expected_2)
-    }
-
-    {
       // add a few more partitions
       val brokers = List(
         (200, "broker2", 1111, "topic1", 2),
@@ -78,15 +68,13 @@
 
 
       // wait a bit to make sure rebalancing logic is triggered
-      Thread.sleep(1000)
-      // check Partition Owner Registry
-      val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
-      val expected_3 = List( ("200-0", "group1_consumer1-0"),
+      Thread.sleep(2000)
+			checkPartitionOwnerRegistry(List(
+				("200-0", "group1_consumer1-0"),
-                             ("200-1", "group1_consumer1-0"),
-                             ("300-0", "group1_consumer1-0"),
-                             ("300-1", "group1_consumer2-0"),
+        ("200-1", "group1_consumer1-0"),
+        ("300-0", "group1_consumer1-0"),
+        ("300-1", "group1_consumer2-0"),
-                             ("400-0", "group1_consumer2-0") )
-      checkSetEqual(actual_3, expected_3)
+        ("400-0", "group1_consumer2-0")))
     }
 
     {
@@ -95,20 +83,59 @@
 
       // wait a bit to make sure rebalancing logic is triggered
       Thread.sleep(500)
-      // check Partition Owner Registry
-      val actual_4 = getZKChildrenValues(dirs.consumerOwnerDir)
-      val expected_4 = List( ("200-0", "group1_consumer1-0"),
+			checkPartitionOwnerRegistry(List(
+				("200-0", "group1_consumer1-0"),
-                             ("200-1", "group1_consumer1-0"),
-                             ("300-0", "group1_consumer2-0"),
+				("200-1", "group1_consumer1-0"),
+				("300-0", "group1_consumer2-0"),
-                             ("300-1", "group1_consumer2-0") )
-      checkSetEqual(actual_4, expected_4)
+				("300-1", "group1_consumer2-0")))
     }
 
     zkConsumerConnector1.shutdown
-    ZKConsumerConnector2.shutdown
+    zkConsumerConnector2.shutdown
   }
 
+	def testThatConsumerListenerIsCalledAfterSuccessfulRebalance() {
+		var listener1 = new TestConsumerListener;
+		var listener2 = new TestConsumerListener;
+		ZkUtils.setupPartition(zkClient, 400, "broker1", 1111, "topic1", 2)
+		val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, firstConsumer))
+		val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, false);
+
+		zkConsumerConnector1.setListener(listener1)
+		zkConsumerConnector1.createMessageStreams(Map(topic -> 1))
+		checkPartitionOwnerRegistry(List(
+			("400-0", "group1_consumer1-0"),
+			("400-1", "group1_consumer1-0")))
+
+		assertEquals(listener1.successful, 1)
+		assertEquals(listener1.failed, 0)
+
+		// add a second consumer
+		val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, secondConsumer))
+		val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, false)
+		zkConsumerConnector2.setListener(listener2)
+		zkConsumerConnector2.createMessageStreams(Map(topic -> 1))
+		// wait a bit to make sure rebalancing logic is triggered
+		Thread.sleep(200)
+
+		assertEquals(listener1.successful, 2)
+		assertEquals(listener1.failed, 0)
+		assertEquals(listener2.successful, 1)
+		assertEquals(listener2.failed, 0)
+
+		checkPartitionOwnerRegistry(List(
+			("400-0", "group1_consumer1-0"),
+			("400-1", "group1_consumer2-0")))
+		zkConsumerConnector1.shutdown
+		zkConsumerConnector2.shutdown
+	}
+
+	private def checkPartitionOwnerRegistry(expected: List[(String, String)]) {
+		val actual = getZKChildrenValues(dirs.consumerOwnerDir)
+		checkSetEqual(actual, expected)
+	}
+
-  private def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
+	private def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
     import scala.collection.JavaConversions
     val children = zkClient.getChildren(path)
     Collections.sort(children)
@@ -122,6 +149,20 @@
     for (i <- 0 until expected.length) {
       assertEquals(expected(i)._1, actual(i)._1)
       assertEquals(expected(i)._2, actual(i)._2)
+    }
+  }
+  
+  class TestConsumerListener extends ConsumerListener {
+
+    var successful = 0
+    var failed = 0
+
+    override def afterRebalance(success: Boolean) {
+			if (success) {
+				successful += 1
+			} else {
+				failed += 1
+			}
     }
   }
 }
