Index: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(revision c4b2647)
+++ core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala	(revision dd0d409c549c8b5a1608e83ee667b66b068990ef)
@@ -500,7 +500,9 @@
        * partitions in parallel. So, not stopping the fetchers leads to duplicate data.
        */
       closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
-
+			if (consumerListener != null) {
+				consumerListener.beforeRebalanceAttempt()
+			}
       releasePartitionOwnership(topicRegistry)
 
       var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
@@ -566,16 +568,6 @@
       fetcher match {
         case Some(f) => f.stopConnectionsToAllBrokers
         f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
-        info("Committing all offsets after clearing the fetcher queues")
-        /**
-        * here, we need to commit offsets before stopping the consumer from returning any more messages
-        * from the current data chunk. Since partition ownership is not yet released, this commit offsets
-        * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
-        * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
-        * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
-        * successfully and the fetchers restart to fetch more data chunks
-        **/
-        commitOffsets
         case None =>
       }
     }
Index: core/src/main/scala/kafka/javaapi/consumer/ConsumerListener.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/main/scala/kafka/javaapi/consumer/ConsumerListener.java	(revision c4b2647)
+++ core/src/main/scala/kafka/javaapi/consumer/ConsumerListener.java	(revision dd0d409c549c8b5a1608e83ee667b66b068990ef)
@@ -11,4 +11,17 @@
   public void afterRebalance(boolean success) {
   }
 
+  /**
+   * This method is called before each re-balance attempt, right after all
+   * fetchers have been closed and partition ownership is about to be released.
+   * <p />
+   * This method is the best place to handle consumed but not
+   * committed messages. You could call {@link ZookeeperConsumerConnector#commitOffsets()}
+   * to make sure that messages already read are not duplicated. Or
+   * you could just throw those messages away because you want them to
+   * be re-consumed by whichever consumer gets the topic/partition assigned.
+   */
+  public void beforeRebalanceAttempt() {
+  }
+
 }
Index: core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala	(revision c4b2647)
+++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala	(revision dd0d409c549c8b5a1608e83ee667b66b068990ef)
@@ -28,6 +28,7 @@
 import org.apache.log4j.{Level, Logger}
 import kafka.message._
 import kafka.serializer.StringDecoder
+import kafka.javaapi.consumer.ConsumerListener
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
 
@@ -56,12 +57,12 @@
 
     var actualMessages: List[Message] = Nil
 
-    // test consumer timeout logic
+		// test consumer timeout logic
     val consumerConfig0 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
       override val consumerTimeoutMs = 200
     }
-    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val zkConsumerConnector0 = createConsumerConnector(consumerConfig0)
     val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
 
     // no messages to consume, we should hit timeout;
@@ -84,7 +85,7 @@
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
-    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    val zkConsumerConnector1 = createConsumerConnector(consumerConfig1)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1, receivedMessages1)
@@ -94,7 +95,7 @@
     // create a consumer
     val consumerConfig2 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer2))
-    val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
+    val zkConsumerConnector2 = createConsumerConnector(consumerConfig2)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
     // send some messages to each broker
     val sentMessages2 = sendMessages(nMessages, "batch2")
@@ -107,7 +108,7 @@
     // create a consumer with empty map
     val consumerConfig3 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer3))
-    val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
+    val zkConsumerConnector3 = createConsumerConnector(consumerConfig3)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
     // send some messages to each broker
     Thread.sleep(200)
@@ -125,7 +126,7 @@
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
-  def testCompression() {
+	def testCompression() {
     val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
     requestHandlerLogger.setLevel(Level.FATAL)
 
@@ -135,7 +136,7 @@
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
-    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    val zkConsumerConnector1 = createConsumerConnector(consumerConfig1)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1, receivedMessages1)
@@ -146,7 +147,7 @@
     // create a consumer
     val consumerConfig2 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer2))
-    val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
+    val zkConsumerConnector2 = createConsumerConnector(consumerConfig2)
     val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
     // send some messages to each broker
     val sentMessages2 = sendMessages(nMessages, "batch2", DefaultCompressionCodec)
@@ -160,7 +161,7 @@
     println("Sending more messages for 3rd consumer")
     val consumerConfig3 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer3))
-    val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
+    val zkConsumerConnector3 = createConsumerConnector(consumerConfig3)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
     // send some messages to each broker
     Thread.sleep(200)
@@ -285,4 +286,20 @@
     }
     messages.sortWith((s,t) => s.checksum < t.checksum)
   }
+
+	/**
+	 * Create a consumerConnector that always commits its offsets upon re-balance
+	 * so that messages will not be duplicated upon re-balance.
+	 */
+	private def createConsumerConnector(consumerConfig: ConsumerConfig): ZookeeperConsumerConnector = {
+		val consumerConnector: ZookeeperConsumerConnector = new ZookeeperConsumerConnector(consumerConfig, true)
+		val consumerListenerThatCommitsOffsetsOnRebalance = new ConsumerListener {
+			override def beforeRebalanceAttempt() {
+				consumerConnector.commitOffsets()
+			}
+		}
+		consumerConnector.setListener(consumerListenerThatCommitsOffsetsOnRebalance)
+		consumerConnector
+	}
+
 }
Index: core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala	(revision c4b2647)
+++ core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala	(revision dd0d409c549c8b5a1608e83ee667b66b068990ef)
@@ -94,9 +94,9 @@
     zkConsumerConnector2.shutdown
   }
 
-	def testThatConsumerListenerIsCalledAfterSuccessfulRebalance() {
-		var listener1 = new TestConsumerListener;
-		var listener2 = new TestConsumerListener;
+	def testThatConsumerListenerIsCalled() {
+		val listener1 = new TestConsumerListener
+		val listener2 = new TestConsumerListener
 		ZkUtils.setupPartition(zkClient, 400, "broker1", 1111, "topic1", 2)
 		val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, firstConsumer))
 		val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, false);
@@ -107,10 +107,12 @@
 			("400-0", "group1_consumer1-0"),
 			("400-1", "group1_consumer1-0")))
 
-		assertEquals(listener1.successful, 1)
-		assertEquals(listener1.failed, 0)
+		assertTrue(listener1.beforeAttempt >= 1)
+		assertEquals(1, listener1.afterSuccessful)
+		assertEquals(0, listener1.afterFailed)
 
 		// add a second consumer
+		listener1.reset
 		val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, secondConsumer))
 		val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, false)
 		zkConsumerConnector2.setListener(listener2)
@@ -118,10 +120,12 @@
 		// wait a bit to make sure rebalancing logic is triggered
 		Thread.sleep(200)
 
-		assertEquals(listener1.successful, 2)
-		assertEquals(listener1.failed, 0)
-		assertEquals(listener2.successful, 1)
-		assertEquals(listener2.failed, 0)
+		assertTrue(listener1.beforeAttempt >= 1)
+		assertEquals(1, listener1.afterSuccessful)
+		assertEquals(0, listener1.afterFailed)
+		assertTrue(listener2.beforeAttempt >= 1)
+		assertEquals(1, listener2.afterSuccessful)
+		assertEquals(0, listener2.afterFailed)
 
 		checkPartitionOwnerRegistry(List(
 			("400-0", "group1_consumer1-0"),
@@ -154,15 +158,26 @@
   
   class TestConsumerListener extends ConsumerListener {
 
-    var successful = 0
-    var failed = 0
+    var afterSuccessful = 0
+    var afterFailed = 0
+		var beforeAttempt = 0
 
     override def afterRebalance(success: Boolean) {
 			if (success) {
-				successful += 1
+				afterSuccessful += 1
 			} else {
-				failed += 1
+				afterFailed += 1
 			}
+    }
+
+		override def beforeRebalanceAttempt() {
+			beforeAttempt += 1
+		}
+
+		def reset() {
+			beforeAttempt = 0
+			afterFailed = 0
+			afterSuccessful = 0
-    }
-  }
+		}
+	}
 }
