diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 1c7033f..c531cd1 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -88,5 +88,11 @@ class ConsumerConfig(props: Properties) extends ZKConfig(props) {
 
   /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
   val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
+
+  /** Use shallow iterator over compressed messages directly. This feature should be used very carefully.
+   *  Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
+   *  overhead of decompression.
+   *  */
+  val enableShallowIterator = Utils.getBoolean(props, "shallowiterator.enable", false)
 }
 
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index 6007d36..9c6828c 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -31,7 +31,8 @@ import kafka.message.{MessageAndOffset, MessageAndMetadata}
  */
 class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
                           consumerTimeoutMs: Int,
-                          private val decoder: Decoder[T])
+                          private val decoder: Decoder[T],
+                          val enableShallowIterator: Boolean)
   extends IteratorTemplate[MessageAndMetadata[T]] with Logging {
 
   private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
@@ -75,7 +76,8 @@ class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
                         .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo))
           currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
         }
-        localCurrent = currentDataChunk.messages.iterator
+        localCurrent = if (enableShallowIterator) currentDataChunk.messages.shallowIterator
+                       else currentDataChunk.messages.iterator
         current.set(localCurrent)
       }
     }
diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala
index a9a52c5..3ef0978 100644
--- a/core/src/main/scala/kafka/consumer/KafkaStream.scala
+++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala
@@ -22,14 +22,14 @@ import java.util.concurrent.BlockingQueue
 import kafka.serializer.Decoder
 import kafka.message.MessageAndMetadata
 
-
 class KafkaStream[T](private val queue: BlockingQueue[FetchedDataChunk],
-                                    consumerTimeoutMs: Int,
-                                    private val decoder: Decoder[T])
+                     consumerTimeoutMs: Int,
+                     private val decoder: Decoder[T],
+                     val enableShallowIterator: Boolean)
    extends Iterable[MessageAndMetadata[T]] with java.lang.Iterable[MessageAndMetadata[T]] {
 
   private val iter: ConsumerIterator[T] =
-    new ConsumerIterator[T](queue, consumerTimeoutMs, decoder)
+    new ConsumerIterator[T](queue, consumerTimeoutMs, decoder, enableShallowIterator)
 
   /**
    *  Create an iterator over messages in the stream.
@@ -44,4 +44,4 @@ class KafkaStream[T](private val queue: BlockingQueue[FetchedDataChunk],
     iter.clearCurrentChunk()
   }
 
-}
\ No newline at end of file
+}
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index fd1971e..f7782df 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -197,7 +197,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       threadIdSet.map(_ => {
         val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
         val stream = new KafkaStream[T](
-          queue, config.consumerTimeoutMs, decoder)
+          queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
         (queue, stream)
       })
     ).flatten.toList
@@ -746,7 +746,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       .map(e => {
         val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
         val stream = new KafkaStream[T](
-          queue, config.consumerTimeoutMs, decoder)
+          queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
         (queue, stream)
     }).toList
 
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 2f8738b..5afd6e1 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -77,9 +77,12 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
     buffer.reset()
     written
   }
-  
+
+  /** default iterator that iterates over decompressed messages */
   override def iterator: Iterator[MessageAndOffset] = internalIterator()
 
+  /** iterator over compressed messages without decompressing */
+  def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true)
 
   def verifyMessageSize(maxMessageSize: Int){
     var shallowIter = internalIterator(true)
@@ -123,6 +126,9 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
         message.limit(size)
         topIter.position(topIter.position + size)
         val newMessage = new Message(message)
+        if(!newMessage.isValid)
+          throw new InvalidMessageException("message is invalid, compression codec: " + newMessage.compressionCodec
+            + " size: " + size + " curr offset: " + currValidBytes + " init offset: " + initialOffset)
 
         if(isShallow){
           currValidBytes += 4 + size
@@ -132,16 +138,12 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
         else{
           newMessage.compressionCodec match {
             case NoCompressionCodec =>
-              if(!newMessage.isValid)
-                throw new InvalidMessageException("Uncompressed essage is invalid")
               debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
               innerIter = null
               currValidBytes += 4 + size
               trace("currValidBytes = " + currValidBytes)
               new MessageAndOffset(newMessage, currValidBytes)
             case _ =>
-              if(!newMessage.isValid)
-                throw new InvalidMessageException("Compressed message is invalid")
               debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
               innerIter = CompressionUtils.decompress(newMessage).internalIterator()
               if (!innerIter.hasNext) {
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index e3dd841..0c4a4ed 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -73,18 +73,21 @@ private[async] class ProducerSendThread[T](val threadName: String,
         // check if the queue time is reached. This happens when the poll method above returns after a timeout and
         // returns a null object
         val expired = currentQueueItem == null
-        if(currentQueueItem != null) {
+        if(currentQueueItem != null)
           trace("Dequeued item for topic %s and partition %d"
               .format(currentQueueItem.getTopic, currentQueueItem.getPartition))
-          // handle the dequeued current item
-          if(cbkHandler != null)
-            events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem)
-          else
-            events += currentQueueItem
 
-          // check if the batch size is reached
-          full = events.size >= batchSize
+        // handle the dequeued current item
+        if(cbkHandler != null)
+          events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem)
+        else {
+          if (currentQueueItem != null)
+            events += currentQueueItem
         }
+
+        // check if the batch size is reached
+        full = events.size >= batchSize
+
         if(full || expired) {
           if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..")
           if(full) debug("Batch full. Sending..")
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 962a86d..c81c356 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -94,6 +94,10 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
       //make sure the last offset after iteration is correct
       assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
+
+      //make sure shallow iterator is the same as deep iterator
+      TestUtils.checkEquals[Message](TestUtils.getMessageIterator(messageSet.shallowIterator),
+                                     TestUtils.getMessageIterator(messageSet.iterator))
     }
 
     // test for compressed regular messages
@@ -104,6 +108,8 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
       //make sure the last offset after iteration is correct
       assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
+
+      verifyShallowIterator(messageSet)
     }
 
     // test for mixed empty and non-empty messagesets uncompressed
@@ -121,6 +127,10 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure the last offset after iteration is correct
       assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
+
+      //make sure shallow iterator is the same as deep iterator
+      TestUtils.checkEquals[Message](TestUtils.getMessageIterator(mixedMessageSet.shallowIterator),
+                                     TestUtils.getMessageIterator(mixedMessageSet.iterator))
     }
 
     // test for mixed empty and non-empty messagesets compressed
@@ -138,7 +148,15 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure the last offset after iteration is correct
       assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
+
+      verifyShallowIterator(mixedMessageSet)
     }
   }
 
+  def verifyShallowIterator(messageSet: ByteBufferMessageSet) {
+      //make sure the offsets returned by a shallow iterator is a subset of that of a deep iterator
+      val shallowOffsets = messageSet.shallowIterator.map(msgAndOff => msgAndOff.offset).toSet
+      val deepOffsets = messageSet.iterator.map(msgAndOff => msgAndOff.offset).toSet
+      assertTrue(shallowOffsets.subsetOf(deepOffsets))
+  }
 }
diff --git a/system_test/mirror_maker/config/blacklisttest.consumer.properties b/system_test/mirror_maker/config/blacklisttest.consumer.properties
index 2c4c283..6ea85ec 100644
--- a/system_test/mirror_maker/config/blacklisttest.consumer.properties
+++ b/system_test/mirror_maker/config/blacklisttest.consumer.properties
@@ -24,4 +24,5 @@ zk.connectiontimeout.ms=1000000
 
 #consumer group id
 groupid=group1
+shallowiterator.enable=true
 
diff --git a/system_test/mirror_maker/config/whitelisttest_1.consumer.properties b/system_test/mirror_maker/config/whitelisttest_1.consumer.properties
index 2c4c283..6ea85ec 100644
--- a/system_test/mirror_maker/config/whitelisttest_1.consumer.properties
+++ b/system_test/mirror_maker/config/whitelisttest_1.consumer.properties
@@ -24,4 +24,5 @@ zk.connectiontimeout.ms=1000000
 
 #consumer group id
 groupid=group1
+shallowiterator.enable=true
 
diff --git a/system_test/mirror_maker/config/whitelisttest_2.consumer.properties b/system_test/mirror_maker/config/whitelisttest_2.consumer.properties
index 6cbcdb4..e11112f 100644
--- a/system_test/mirror_maker/config/whitelisttest_2.consumer.properties
+++ b/system_test/mirror_maker/config/whitelisttest_2.consumer.properties
@@ -24,4 +24,5 @@ zk.connectiontimeout.ms=1000000
 
 #consumer group id
 groupid=group1
+shallowiterator.enable=true
 
