diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 2d93947..062573f 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -22,6 +22,7 @@ import kafka.utils.{Utils, CommandLineUtils, Logging}
 import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
 import scala.collection.JavaConversions._
 import java.util.concurrent.CountDownLatch
+import java.nio.ByteBuffer
 import kafka.consumer._
 import kafka.serializer._
 import collection.mutable.ListBuffer
@@ -99,8 +100,15 @@ object MirrorMaker extends Logging {
     val bufferSize = options.valueOf(bufferSizeOpt).intValue()
 
     val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => {
-      val config = new ProducerConfig(
-        Utils.loadProps(options.valueOf(producerConfigOpt)))
+      val props = Utils.loadProps(options.valueOf(producerConfigOpt))
+      val config = props.getProperty("partitioner.class") match {
+        case null =>
+          new ProducerConfig(props) {
+            override val partitionerClass = "kafka.producer.ByteArrayPartitioner"
+          }
+        case pClass : String =>
+          new ProducerConfig(props)
+      }
       new Producer[Array[Byte], Array[Byte]](config)
     })
 
@@ -125,7 +133,7 @@ object MirrorMaker extends Logging {
     val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize);
 
     val consumerThreads =
-      streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2))
+      streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, producers, streamAndIndex._2))
 
     val producerThreads = new ListBuffer[ProducerThread]()
 
@@ -162,6 +170,7 @@ object MirrorMaker extends Logging {
 
   class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
                           producerDataChannel: ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]],
+                          producers: Seq[Producer[Array[Byte], Array[Byte]]],
                           threadId: Int)
           extends Thread with Logging {
 
@@ -174,8 +183,20 @@ object MirrorMaker extends Logging {
       info("Starting mirror maker thread " + threadName)
       try {
         for (msgAndMetadata <- stream) {
-          val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message)
-          producerDataChannel.sendRequest(pd)
+          // If the key of the message is empty, put it into the universal channel
+          // Otherwise use a pre-assigned producer to send the message
+          msgAndMetadata.key match {
+            case null =>
+              trace("Send the non-keyed message to %s the producer channel.".format(Utils.readString(ByteBuffer.wrap(msgAndMetadata.message))))
+              val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message)
+              producerDataChannel.sendRequest(pd)
+            case key : Array[Byte] =>
+              val producerId = java.util.Arrays.hashCode(key) % producers.size()
+              trace("Send the message %s with key %s to producer %d.".format(Utils.readString(ByteBuffer.wrap(msgAndMetadata.message)), Utils.readString(ByteBuffer.wrap(msgAndMetadata.key)), producerId))
+              val producer = producers(producerId)
+              val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message)
+              producer.send(pd)
+          }
         }
       } catch {
         case e =>
