Index: core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala	(working copy)
@@ -0,0 +1,92 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import scala.collection._
+import junit.framework.Assert._
+
+import kafka.message._
+import kafka.server._
+import kafka.utils.TestUtils._
+import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.admin.CreateTopicCommand
+import org.junit.Test
+import kafka.serializer.DefaultDecoder
+import kafka.cluster.{Broker, Cluster}
+import org.scalatest.junit.JUnit3Suite
+import kafka.integration.KafkaServerTestHarness
+
+class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
+
+  val numNodes = 1
+  val configs =
+    for(props <- TestUtils.createBrokerConfigs(numNodes))
+    yield new KafkaConfig(props) {
+      override val zkConnect = TestZKUtils.zookeeperConnect
+    }
+  val messages = new mutable.HashMap[Int, Seq[Message]]
+  val topic = "topic"
+  val group = "group1"
+  val consumer0 = "consumer0"
+  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
+  val queue = new LinkedBlockingQueue[FetchedDataChunk]
+  val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
+                                                           c.brokerId,
+                                                           0,
+                                                           queue,
+                                                           new AtomicLong(0),
+                                                           new AtomicLong(0),
+                                                           new AtomicInteger(0)))
+  val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
+
+  override def setUp() {
+    super.setUp
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+  }
+
+  @Test
+  def testDeduplicationInConsumerIterator() {
+    val messages1 = 0.until(5).map(x => new Message((configs(0).brokerId * 5 + x).toString.getBytes)).toList
+    val messages2 = 0.until(10).map(x => new Message((configs(0).brokerId * 5 + x).toString.getBytes)).toList
+    val messageSet1 = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(0), messages1:_*)
+    val messageSet2 = new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(0), messages2:_*)
+    // Send part of the complete message
+    topicInfos(0).enqueue(messageSet1)
+    assertEquals(1, queue.size)
+    // Send the complete message
+    topicInfos(0).enqueue(messageSet2)
+    assertEquals(2, queue.size)
+
+    val zkConsumerConnector = new ZookeeperConsumerConnector(consumerConfig, false)
+    val iter: ConsumerIterator[Message] = new ConsumerIterator[Message](queue, consumerConfig.consumerTimeoutMs,
+                                                                        new DefaultDecoder, consumerConfig.enableShallowIterator)
+    var receivedMessages: List[Message] = Nil
+    for (i <- 0 until 10) {
+      assertTrue(iter.hasNext)
+      receivedMessages ::= iter.next.message
+    }
+
+    assertEquals(0, queue.size)
+    assertEquals(10, receivedMessages.size)
+    assertEquals(receivedMessages.sortWith((s,t) => s.checksum < t.checksum), messages2.sortWith((s,t) => s.checksum < t.checksum))
+  }
+}
Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(revision 1403586)
+++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(working copy)
@@ -24,7 +24,6 @@
 import java.io.{InputStream, ByteArrayOutputStream, DataOutputStream}
 import java.util.concurrent.atomic.AtomicLong
 import kafka.utils.IteratorTemplate
-import kafka.common.InvalidMessageSizeException
 
 object ByteBufferMessageSet {
   
Index: core/src/main/scala/kafka/consumer/ConsumerIterator.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsumerIterator.scala	(revision 1403586)
+++ core/src/main/scala/kafka/consumer/ConsumerIterator.scala	(working copy)
@@ -73,14 +73,24 @@
         return allDone
       } else {
         currentTopicInfo = currentDataChunk.topicInfo
-        if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
+        val cdcFetchOffset = currentDataChunk.fetchOffset
+        val ctiConsumeOffset = currentTopicInfo.getConsumeOffset
+        if (ctiConsumeOffset < cdcFetchOffset) {
           error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
-                        .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo))
-          currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
+            .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
+          currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
         }
-        localCurrent = if (enableShallowIterator) currentDataChunk.messages.shallowIterator
-                       else currentDataChunk.messages.iterator
+        localCurrent =
+          if (enableShallowIterator)
+            currentDataChunk.messages.shallowIterator
+          else
+            currentDataChunk.messages.iterator
+
         current.set(localCurrent)
+        // reject the messages that have already been consumed
+        for (i <- 0L until (ctiConsumeOffset - cdcFetchOffset)) {
+          localCurrent.next()
+        }
       }
       // if we just updated the current chunk and it is empty that means the fetch size is too small!
       if(currentDataChunk.messages.validBytes == 0)
@@ -90,7 +100,7 @@
     }
     val item = localCurrent.next()
     consumedOffset = item.nextOffset
-    
+
     item.message.ensureValid() // validate checksum of message to ensure it is valid
 
     new MessageAndMetadata(decoder.toEvent(item.message), currentTopicInfo.topic)
Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
===================================================================
--- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala	(revision 1403586)
+++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala	(working copy)
@@ -53,17 +53,28 @@
   def enqueue(messages: ByteBufferMessageSet) {
     val size = messages.sizeInBytes
     if(size > 0) {
+      val start = startOffset(messages)
       val next = nextOffset(messages)
       trace("Updating fetch offset = " + fetchedOffset.get + " to " + next)
-      chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
+      chunkQueue.put(new FetchedDataChunk(messages, this, start))
       fetchedOffset.set(next)
       debug("updated fetch offset of (%s) to %d".format(this, next))
       ConsumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size)
       ConsumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size)
     }
   }
-  
+
   /**
+   * Get the fetch offset of the first message
+   */
+   private def startOffset(messages: ByteBufferMessageSet): Long = {
+    var nextOffset = -1L
+    val iter = messages.shallowIterator
+    if (iter.hasNext)
+      nextOffset = iter.next.offset
+    nextOffset
+  }
+  /**
    * Get the next fetch offset after this message set
    */
   private def nextOffset(messages: ByteBufferMessageSet): Long = {
