diff --git a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
new file mode 100644
index 0000000..752a4fc
--- /dev/null
+++ b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer
+
+
+import kafka.utils._
+
+private class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner[Array[Byte]] {
+  def partition(key: Array[Byte], numPartitions: Int): Int = {
+    Utils.abs(java.util.Arrays.hashCode(key)) % numPartitions
+  }
+}
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 2d93947..062573f 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -22,6 +22,7 @@ import kafka.utils.{Utils, CommandLineUtils, Logging}
 import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
 import scala.collection.JavaConversions._
 import java.util.concurrent.CountDownLatch
+import java.nio.ByteBuffer
 import kafka.consumer._
 import kafka.serializer._
 import collection.mutable.ListBuffer
@@ -99,8 +100,15 @@ object MirrorMaker extends Logging {
     val bufferSize = options.valueOf(bufferSizeOpt).intValue()
 
     val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => {
-      val config = new ProducerConfig(
-        Utils.loadProps(options.valueOf(producerConfigOpt)))
+      val props = Utils.loadProps(options.valueOf(producerConfigOpt))
+      val config = props.getProperty("partitioner.class") match {
+        case null =>
+          new ProducerConfig(props) {
+            override val partitionerClass = "kafka.producer.ByteArrayPartitioner"
+          }
+        case pClass : String =>
+          new ProducerConfig(props)
+      }
       new Producer[Array[Byte], Array[Byte]](config)
     })
 
@@ -125,7 +133,7 @@ object MirrorMaker extends Logging {
     val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize);
 
     val consumerThreads =
-      streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2))
+      streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, producers, streamAndIndex._2))
 
     val producerThreads = new ListBuffer[ProducerThread]()
 
@@ -162,6 +170,7 @@ object MirrorMaker extends Logging {
 
   class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
                           producerDataChannel: ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]],
+                          producers: Seq[Producer[Array[Byte], Array[Byte]]],
                           threadId: Int)
           extends Thread with Logging {
 
@@ -174,8 +183,20 @@ object MirrorMaker extends Logging {
       info("Starting mirror maker thread " + threadName)
       try {
         for (msgAndMetadata <- stream) {
-          val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message)
-          producerDataChannel.sendRequest(pd)
+          // If the key of the message is empty, put it into the universal channel
+          // Otherwise use a pre-assigned producer to send the message
+          msgAndMetadata.key match {
+            case null =>
+              trace("Send the non-keyed message to %s the producer channel.".format(Utils.readString(ByteBuffer.wrap(msgAndMetadata.message))))
+              val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message)
+              producerDataChannel.sendRequest(pd)
+            case key : Array[Byte] =>
+              val producerId = java.util.Arrays.hashCode(key) % producers.size()
+              trace("Send the message %s with key %s to producer %d.".format(Utils.readString(ByteBuffer.wrap(msgAndMetadata.message)), Utils.readString(ByteBuffer.wrap(msgAndMetadata.key)), producerId))
+              val producer = producers(producerId)
+              val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message)
+              producer.send(pd)
+          }
         }
       } catch {
         case e =>
