diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
index 13c3f77..8374968 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
@@ -64,7 +64,8 @@ trait ConsumerConnector {
   def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter,
                                         numStreams: Int = 1,
                                         keyDecoder: Decoder[K] = new DefaultDecoder(),
-                                        valueDecoder: Decoder[V] = new DefaultDecoder())
+                                        valueDecoder: Decoder[V] = new DefaultDecoder(),
+                                        isShallow: Boolean = false)
     : Seq[KafkaStream[K,V]]
 
   /**
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index b80c0b0..5f45020 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -34,10 +34,11 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
                              consumerTimeoutMs: Int,
                              private val keyDecoder: Decoder[K],
                              private val valueDecoder: Decoder[V],
-                             val clientId: String)
+                             val clientId: String,
+                             val isShallow: Boolean = false)
   extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
 
-  private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
+  private val current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
   private var currentTopicInfo: PartitionTopicInfo = null
   private var consumedOffset: Long = -1L
   private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId)
@@ -82,7 +83,10 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
             .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
           currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
         }
-        localCurrent = currentDataChunk.messages.iterator
+        if (isShallow)
+          localCurrent = currentDataChunk.messages.shallowIterator
+        else
+          localCurrent = currentDataChunk.messages.iterator
 
         current.set(localCurrent)
       }
diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala
index 31eaf86..94fc659 100644
--- a/core/src/main/scala/kafka/consumer/KafkaStream.scala
+++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala
@@ -26,11 +26,12 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk],
                         consumerTimeoutMs: Int,
                         private val keyDecoder: Decoder[K],
                         private val valueDecoder: Decoder[V],
-                        val clientId: String)
+                        val clientId: String,
+                        val isShallow: Boolean = false)
    extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] {
 
   private val iter: ConsumerIterator[K,V] =
-    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, clientId)
+    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, clientId, isShallow)
 
   /**
    *  Create an iterator over messages in the stream.
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index e7a692a..b8cd155 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -137,8 +137,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, 
                                         numStreams: Int, 
                                         keyDecoder: Decoder[K] = new DefaultDecoder(), 
-                                        valueDecoder: Decoder[V] = new DefaultDecoder()) = {
-    val wildcardStreamsHandler = new WildcardStreamsHandler[K,V](topicFilter, numStreams, keyDecoder, valueDecoder)
+                                        valueDecoder: Decoder[V] = new DefaultDecoder(),
+                                        isShallow: Boolean = false) = {
+    val wildcardStreamsHandler = new WildcardStreamsHandler[K,V](topicFilter, numStreams, keyDecoder, valueDecoder, isShallow)
     wildcardStreamsHandler.streams
   }
 
@@ -707,7 +708,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   class WildcardStreamsHandler[K,V](topicFilter: TopicFilter,
                                   numStreams: Int,
                                   keyDecoder: Decoder[K],
-                                  valueDecoder: Decoder[V])
+                                  valueDecoder: Decoder[V],
+                                  isShallow: Boolean = false)
                                 extends TopicEventHandler[String] {
 
     if (messageStreamCreated.getAndSet(true))
@@ -721,7 +723,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                           config.consumerTimeoutMs, 
                                           keyDecoder, 
                                           valueDecoder, 
-                                          config.clientId)
+                                          config.clientId,
+                                          isShallow)
         (queue, stream)
     }).toList
 
diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
index 0a95248..d2982fe 100644
--- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
@@ -28,6 +28,9 @@ class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends Message
     this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), scala.collection.JavaConversions.asBuffer(messages): _*).buffer)
   }
 
+  // Currently does not support compression with key since it is not used at all;
+  // May need to add this support in the future.
+
   def this(messages: java.util.List[Message]) {
     this(NoCompressionCodec, messages)
   }
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 078ebb4..fb45bd3 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -18,16 +18,15 @@
 package kafka.message
 
 import scala.reflect.BeanProperty
-import kafka.utils.Logging
+import kafka.utils.{Logging, IteratorTemplate}
 import java.nio.ByteBuffer
 import java.nio.channels._
 import java.io.{InputStream, ByteArrayOutputStream, DataOutputStream}
 import java.util.concurrent.atomic.AtomicLong
-import kafka.utils.IteratorTemplate
 
 object ByteBufferMessageSet {
   
-  private def create(offsetCounter: AtomicLong, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer = {
+  private def create(offsetCounter: AtomicLong, compressionCodec: CompressionCodec, partitionKey: Array[Byte], messages: Message*): ByteBuffer = {
     if(messages.size == 0) {
       MessageSet.Empty.buffer
     } else if(compressionCodec == NoCompressionCodec) {
@@ -51,7 +50,7 @@ object ByteBufferMessageSet {
         output.close()
       }
       val bytes = byteArrayStream.toByteArray
-      val message = new Message(bytes, compressionCodec)
+      val message = new Message(bytes, partitionKey, compressionCodec)
       val buffer = ByteBuffer.allocate(message.size + MessageSet.LogOverhead)
       writeMessage(buffer, message, offset)
       buffer.rewind()
@@ -98,12 +97,20 @@ object ByteBufferMessageSet {
 class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet with Logging {
   private var shallowValidByteCount = -1
 
+  def this(compressionCodec: CompressionCodec, partitionKey: Array[Byte], messages: Message*) {
+    this(ByteBufferMessageSet.create(new AtomicLong(0), compressionCodec, partitionKey, messages:_*))
+  }
+
   def this(compressionCodec: CompressionCodec, messages: Message*) {
-    this(ByteBufferMessageSet.create(new AtomicLong(0), compressionCodec, messages:_*))
+    this(ByteBufferMessageSet.create(new AtomicLong(0), compressionCodec, null, messages:_*))
+  }
+
+  def this(compressionCodec: CompressionCodec, offsetCounter: AtomicLong, partitionKey: Array[Byte], messages: Message*) {
+    this(ByteBufferMessageSet.create(offsetCounter, compressionCodec, partitionKey, messages:_*))
   }
   
   def this(compressionCodec: CompressionCodec, offsetCounter: AtomicLong, messages: Message*) {
-    this(ByteBufferMessageSet.create(offsetCounter, compressionCodec, messages:_*))
+    this(ByteBufferMessageSet.create(offsetCounter, compressionCodec, null, messages:_*))
   }
 
   def this(messages: Message*) {
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 2e36d3b..2b6e60b 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -294,6 +294,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       *    Enable compression only for specified topics if any
       *    If the list of compressed topics is empty, then enable the specified compression codec for all topics
       *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
+      *
+      *  When compression is enabled, use the partition id as the key of the compressed message
       */
 
     val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
@@ -308,12 +310,12 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
               case 0 =>
                 debug("Sending %d messages with compression codec %d to %s"
                   .format(messages.size, config.compressionCodec.codec, topicAndPartition))
-                new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
+                new ByteBufferMessageSet(config.compressionCodec, topicAndPartition.partition.toString.getBytes, rawMessages: _*)
               case _ =>
                 if(config.compressedTopics.contains(topicAndPartition.topic)) {
                   debug("Sending %d messages with compression codec %d to %s"
                     .format(messages.size, config.compressionCodec.codec, topicAndPartition))
-                  new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
+                  new ByteBufferMessageSet(config.compressionCodec, topicAndPartition.partition.toString.getBytes, rawMessages: _*)
                 }
                 else {
                   debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 6fb545a..7b560e8 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -127,7 +127,7 @@ object MirrorMaker extends Logging {
 
     var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = Nil
     try {
-      streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())).flatten
+      streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder(), true)).flatten
     } catch {
       case t =>
         fatal("Unable to create stream - shutting down mirror maker.")
