diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index a167756..9d142a5 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -20,7 +20,7 @@ package kafka.admin
 import java.util.Random
 import java.util.Properties
 import kafka.api.{TopicMetadata, PartitionMetadata}
-import kafka.cluster.Broker
+import kafka.cluster.{Broker, Cluster}
 import kafka.log.LogConfig
 import kafka.utils.{Logging, ZkUtils, Json}
 import org.I0Itec.zkclient.ZkClient
@@ -57,10 +57,12 @@ object AdminUtils extends Logging {
    * p7        p8        p9        p5        p6       (3nd replica)
    */
   def assignReplicasToBrokers(brokerList: Seq[Int],
+                              filteredCluster: Cluster,
                               nPartitions: Int,
                               replicationFactor: Int,
                               fixedStartIndex: Int = -1,
-                              startPartitionId: Int = -1)
+                              startPartitionId: Int = -1,
+                              maxReplicaPerRack: Int = -1)
   : Map[Int, Seq[Int]] = {
     if (nPartitions <= 0)
       throw new AdminOperationException("number of partitions must be larger than 0")
@@ -79,8 +81,33 @@ object AdminUtils extends Logging {
         nextReplicaShift += 1
       val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
       var replicaList = List(brokerList(firstReplicaIndex))
-      for (j <- 0 until replicationFactor - 1)
-        replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
+      if (maxReplicaPerRack <= 0) {
+        for (j <- 0 until replicationFactor - 1)
+          replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
+      } else {
+        var rackReplicaCount: mutable.Map[Int, Int] = mutable.Map(filteredCluster.getBroker(brokerList(firstReplicaIndex)).get.rack -> 1)
+        var k = 0
+        for (j <- 0 until replicationFactor - 1) {
+          var done = false;
+          while (!done && k < brokerList.size) {
+            val broker = brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, k, brokerList.size))
+            val rack = filteredCluster.getBroker(broker).get.rack
+            if (!(rackReplicaCount contains rack)) {
+              replicaList ::= broker
+              rackReplicaCount += (rack -> 1)
+              done = true;
+            } else if (rackReplicaCount(rack) < maxReplicaPerRack) {
+              rackReplicaCount(rack) = rackReplicaCount(rack) + 1
+              replicaList ::= broker
+              done = true;
+            }
+            k = k + 1
+          }
+          if (!done) {
+            throw new AdminOperationException("not enough brokers available in unique racks to meet maxReplicaPerRack limit of " + maxReplicaPerRack)
+          }
+        }
+      }
       ret.put(currentPartitionId, replicaList.reverse)
       currentPartitionId = currentPartitionId + 1
     }
@@ -89,10 +116,12 @@ object AdminUtils extends Logging {
 
   def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") {
     val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
-    if (existingPartitionsReplicaList.size == 0)
+    val existingMaxRackReplication = ZkUtils.getMaxRackReplicationForTopics(zkClient, List(topic))
+    if (existingPartitionsReplicaList.size == 0 || existingMaxRackReplication.size == 0)
       throw new AdminOperationException("The topic %s does not exist".format(topic))
 
     val existingReplicaList = existingPartitionsReplicaList.head._2
+    val maxReplicaPerRack = existingMaxRackReplication.head._2
     val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
     if (partitionsToAdd <= 0)
       throw new AdminOperationException("The number of partitions for a topic can only be increased")
@@ -100,7 +129,7 @@ object AdminUtils extends Logging {
     // create the new partition replication list
     val brokerList = ZkUtils.getSortedBrokerList(zkClient)
     val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "")
-      AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size)
+      AdminUtils.assignReplicasToBrokers(brokerList, ZkUtils.getFilteredCluster(zkClient, brokerList), partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size, maxReplicaPerRack)
     else
       getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
 
@@ -114,7 +143,7 @@ object AdminUtils extends Logging {
     val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
     // add the new list
     partitionReplicaList ++= newPartitionReplicaList
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, maxReplicaPerRack = maxReplicaPerRack, update = true)
   }
 
   def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = {
@@ -151,16 +180,18 @@ object AdminUtils extends Logging {
                   topic: String,
                   partitions: Int, 
                   replicationFactor: Int, 
+                  maxReplicaPerRack: Int = -1,
                   topicConfig: Properties = new Properties) {
     val brokerList = ZkUtils.getSortedBrokerList(zkClient)
-    val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig)
+    val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, ZkUtils.getFilteredCluster(zkClient, brokerList), partitions, replicationFactor, -1, -1, maxReplicaPerRack)
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig, maxReplicaPerRack)
   }
                   
   def createOrUpdateTopicPartitionAssignmentPathInZK(zkClient: ZkClient,
                                                      topic: String,
                                                      partitionReplicaAssignment: Map[Int, Seq[Int]],
                                                      config: Properties = new Properties,
+                                                     maxReplicaPerRack: Int = -1,
                                                      update: Boolean = false) {
     // validate arguments
     Topic.validate(topic)
@@ -176,13 +207,13 @@ object AdminUtils extends Logging {
     writeTopicConfig(zkClient, topic, config)
     
     // create the partition assignment
-    writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update)
+    writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, maxReplicaPerRack, update)
   }
   
-  private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
+  private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], maxReplicaPerRack: Int, update: Boolean) {
     try {
       val zkPath = ZkUtils.getTopicPath(topic)
-      val jsonPartitionData = ZkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))
+      val jsonPartitionData = ZkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)), maxReplicaPerRack)
 
       if (!update) {
         info("Topic creation " + jsonPartitionData.toString)
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 2637586..ac12aa6 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -94,7 +94,7 @@ object ReassignPartitionsCommand extends Logging {
     var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
     val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
     groupedByTopic.foreach { topicInfo =>
-      val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size,
+      val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, ZkUtils.getFilteredCluster(zkClient, brokerListToReassign), topicInfo._2.size,
         topicInfo._2.head._2.size)
       partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
     }
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 842c110..d27e91f 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -76,14 +76,15 @@ object TopicCommand {
   def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
     val topic = opts.options.valueOf(opts.topicOpt)
     val configs = parseTopicConfigsToBeAdded(opts)
+    val maxReplicaPerRack = if (opts.options.has(opts.maxRackReplicationOpt)) opts.options.valueOf(opts.maxRackReplicationOpt).intValue else -1
     if (opts.options.has(opts.replicaAssignmentOpt)) {
       val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs)
+      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs, maxReplicaPerRack)
     } else {
       CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
       val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
       val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
-      AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)
+      AdminUtils.createTopic(zkClient, topic, partitions, replicas, maxReplicaPerRack, configs)
     }
     println("Created topic \"%s\".".format(topic))
   }
@@ -101,6 +102,9 @@ object TopicCommand {
         AdminUtils.changeTopicConfig(zkClient, topic, configs)
         println("Updated config for topic \"%s\".".format(topic))
       }
+      if(opts.options.has(opts.maxRackReplicationOpt)) {
+        Utils.croak("Changing the max-rack-replication is not supported.")
+      }
       if(opts.options.has(opts.partitionsOpt)) {
         println("WARNING: If partitions are increased for a topic that has a key, the partition " +
           "logic or ordering of the messages will be affected")
@@ -245,6 +249,10 @@ object TopicCommand {
                            .withRequiredArg
                            .describedAs("replication factor")
                            .ofType(classOf[java.lang.Integer])
+    val maxRackReplicationOpt = parser.accepts("max-rack-replication", "The maximum number of replicas assigned to a single rack for each partition in the topic being created.")
+                           .withRequiredArg
+                           .describedAs("max rack replication")
+                           .ofType(classOf[java.lang.Integer])
     val replicaAssignmentOpt = parser.accepts("replica-assignment", "A list of manual partition-to-broker assignments for the topic being created.")
                            .withRequiredArg
                            .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 1d2f81b..2281913 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -89,7 +89,7 @@ object ClientUtils extends Logging{
   }
 
   /**
-   * Parse a list of broker urls in the form host1:port1, host2:port2, ... 
+   * Parse a list of broker urls in the form host1:port1:rack1, host2:port2:rack2, ... 
    */
   def parseBrokerList(brokerListStr: String): Seq[Broker] = {
     val brokersStr = Utils.parseCsvList(brokerListStr)
@@ -100,7 +100,8 @@ object ClientUtils extends Logging{
       val brokerInfos = brokerStr.split(":")
       val hostName = brokerInfos(0)
       val port = brokerInfos(1).toInt
-      new Broker(brokerId, hostName, port)
+      val rack = if (brokerInfos.size > 2) brokerInfos(2).toInt else 0
+      new Broker(brokerId, hostName, port, rack)
     })
   }
   
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 9407ed2..fbc77fc 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -37,7 +37,11 @@ private[kafka] object Broker {
           val brokerInfo = m.asInstanceOf[Map[String, Any]]
           val host = brokerInfo.get("host").get.asInstanceOf[String]
           val port = brokerInfo.get("port").get.asInstanceOf[Int]
-          new Broker(id, host, port)
+          val rack = brokerInfo.get("rack") match {
+            case Some(r) => r.asInstanceOf[Int]
+            case None => 0
+          }
+          new Broker(id, host, port, rack)
         case None =>
           throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
       }
@@ -50,32 +54,34 @@ private[kafka] object Broker {
     val id = buffer.getInt
     val host = readShortString(buffer)
     val port = buffer.getInt
-    new Broker(id, host, port)
+    val rack = buffer.getInt
+    new Broker(id, host, port, rack)
   }
 }
 
-private[kafka] case class Broker(val id: Int, val host: String, val port: Int) {
+private[kafka] case class Broker(val id: Int, val host: String, val port: Int, val rack: Int) {
   
-  override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port)
+  override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port + ",rack:" + rack)
 
-  def getConnectionString(): String = host + ":" + port
+  def getConnectionString(): String = host + ":" + port + ":" + rack
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(id)
     writeShortString(buffer, host)
     buffer.putInt(port)
+    buffer.putInt(rack)
   }
 
-  def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/
+  def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/ + 4 /* rack id*/
 
   override def equals(obj: Any): Boolean = {
     obj match {
       case null => false
-      case n: Broker => id == n.id && host == n.host && port == n.port
+      case n: Broker => id == n.id && host == n.host && port == n.port && rack == n.rack
       case _ => false
     }
   }
   
-  override def hashCode(): Int = hashcode(id, host, port)
+  override def hashCode(): Int = hashcode(id, host, port, rack)
   
 }
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a0267ae..a2efb98 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -52,7 +52,8 @@ class ControllerContext(val zkClient: ZkClient,
                         var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] =
                           new mutable.HashMap,
                         var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] =
-                          new mutable.HashSet) {
+                          new mutable.HashSet,
+                        var maxRackReplicaAssignment: mutable.Map[String, Int] = mutable.Map.empty) {
 
   private var liveBrokersUnderlying: Set[Broker] = Set.empty
   private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
@@ -463,8 +464,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
           "reassigned not yet caught up with the leader")
         val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
         val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
+        val existingMaxReplicaAssignment = controllerContext.maxRackReplicaAssignment(topicAndPartition.topic)
         //1. Update AR in ZK with OAR + RAR.
-        updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
+        updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq, existingMaxReplicaAssignment)
         //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
         updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
           newAndOldReplicas.toSeq)
@@ -488,7 +490,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)
         stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
         //10. Update AR in ZK with RAR.
-        updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
+        val existingMaxReplicaAssignment = controllerContext.maxRackReplicaAssignment(topicAndPartition.topic)
+        updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas, existingMaxReplicaAssignment)
         //11. Update the /admin/reassign_partitions path in ZK to remove this partition.
         removePartitionFromReassignedPartitions(topicAndPartition)
         info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
@@ -637,6 +640,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
     controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
     controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
+    controllerContext.maxRackReplicaAssignment = ZkUtils.getMaxRackReplicationForTopics(zkClient, controllerContext.allTopics.toSeq)
     controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
     // update the leader and isr cache for all existing partitions from Zookeeper
     updateLeaderAndIsrCache()
@@ -746,10 +750,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 
   private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
-                                                 replicas: Seq[Int]) {
+                                                 replicas: Seq[Int],
+                                                 maxRackReplication: Int) {
     val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(topicAndPartition.topic))
     partitionsAndReplicasForThisTopic.put(topicAndPartition, replicas)
-    updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic)
+    updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic, maxRackReplication)
     info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, replicas.mkString(",")))
     // update the assigned replica list after a successful zookeeper write
     controllerContext.partitionReplicaAssignment.put(topicAndPartition, replicas)
@@ -811,10 +816,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 
   def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
-                                         newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) {
+                                         newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]],
+                                         maxRackReplication: Int) {
     try {
       val zkPath = ZkUtils.getTopicPath(topicAndPartition.topic)
-      val jsonPartitionMap = ZkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)))
+      val jsonPartitionMap = ZkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => (e._1.partition.toString -> e._2)), maxRackReplication)
       ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap)
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
     } catch {
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index ac4262a..9047843 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -387,9 +387,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             controllerContext.allTopics = currentChildren
 
             val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
+            val maxRackReplicaAssignment = ZkUtils.getMaxRackReplicationForTopics(zkClient, newTopics.toSeq)
             controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
               !deletedTopics.contains(p._1.topic))
             controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
+            controllerContext.maxRackReplicaAssignment.++=(maxRackReplicaAssignment)
             info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
               deletedTopics, addedPartitionReplicaAssignment))
             if(newTopics.size > 0)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 29abc46..b705a1a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -574,7 +574,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ErrorMapping.UnknownTopicOrPartitionCode =>
           if (config.autoCreateTopicsEnable) {
             try {
-              AdminUtils.createTopic(zkClient, topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
+              AdminUtils.createTopic(zkClient, topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor, config.defaultMaxRackReplication)
               info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
                 .format(topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
             } catch {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3c3aafc..c171185 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -48,6 +48,12 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the broker id for this server */
   val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
 
+  /* the rack id for this server */
+  val rackId: Int = props.getIntInRange("broker.rack", 0, (0, Int.MaxValue))
+
+  /* the default max-rack-replication for auto topic creation */
+  val defaultMaxRackReplication = props.getIntInRange("default.max.rack.replication", -1, (-1, Int.MaxValue))
+
   /* the maximum size of message that the server can receive */
   val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue))
   
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 9dca55c..263f9b0 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -33,6 +33,7 @@ import java.net.InetAddress
  * we are dead.
  */
 class KafkaHealthcheck(private val brokerId: Int, 
+                       private val rackId: Int,
                        private val advertisedHost: String, 
                        private val advertisedPort: Int,
                        private val zkSessionTimeoutMs: Int,
@@ -55,7 +56,7 @@ class KafkaHealthcheck(private val brokerId: Int,
       else
         advertisedHost
     val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
-    ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, zkSessionTimeoutMs, jmxPort)
+    ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, rackId, zkSessionTimeoutMs, jmxPort)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5e34f95..3e55ce5 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -99,7 +99,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     topicConfigManager.startup()
     
     /* tell everyone we are alive */
-    kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
+    kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.rackId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
     kafkaHealthcheck.startup()
 
     
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index b42e52b..b71b96d 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -185,11 +185,11 @@ object ZkUtils extends Logging {
     }
   }
 
-  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) {
+  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, rack: Int, timeout: Int, jmxPort: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val timestamp = SystemTime.milliseconds.toString
-    val brokerInfo = Json.encode(Map("version" -> 1, "host" -> host, "port" -> port, "jmx_port" -> jmxPort, "timestamp" -> timestamp))
-    val expectedBroker = new Broker(id, host, port)
+    val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port, "rack" -> rack, "jmx_port" -> jmxPort, "timestamp" -> timestamp))
+    val expectedBroker = new Broker(id, host, port, rack)
 
     try {
       createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, expectedBroker,
@@ -220,8 +220,8 @@ object ZkUtils extends Logging {
   /**
    * Get JSON partition to replica map from zookeeper.
    */
-  def replicaAssignmentZkData(map: Map[String, Seq[Int]]): String = {
-    Json.encode(Map("version" -> 1, "partitions" -> map))
+  def replicaAssignmentZkData(map: Map[String, Seq[Int]], maxRackReplication: Int): String = {
+    Json.encode(Map("version" -> 2, "partitions" -> map, "max-rack-replication" -> maxRackReplication))
   }
 
   /**
@@ -504,6 +504,18 @@ object ZkUtils extends Logging {
     cluster
   }
 
+  /* Provide a Cluster object filtered with the given broker list. Usefull when dealing with a sub-cluster */
+  def getFilteredCluster(zkClient: ZkClient, brokerList: Seq[Int]) : Cluster = {
+    val cluster = new Cluster
+    for (brokerId <- brokerList) {
+      ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
+        case Some(brokerInfo) => cluster.add(Broker.createBroker(brokerId, brokerInfo))
+        case None => /* do nothing */
+      }
+    }
+    cluster
+  }
+
   def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topicAndPartitions: Set[TopicAndPartition])
   : mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
     val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
@@ -516,6 +528,28 @@ object ZkUtils extends Logging {
     ret
   }
 
+  def getMaxRackReplicationForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[String, Int] = {
+    val ret = new mutable.HashMap[String, Int]()
+    topics.foreach { topic =>
+      val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
+      jsonPartitionMapOpt match {
+        case Some(jsonPartitionMap) =>
+          Json.parseFull(jsonPartitionMap) match {
+            case Some(m) => m.asInstanceOf[Map[String, Any]].get("max-rack-replication") match {
+              case Some(repl)  =>
+                val maxRackReplication = repl.asInstanceOf[Int]
+                ret.put(topic, maxRackReplication)
+              case None =>
+                ret.put(topic, -1)
+            }
+            case None =>
+          }
+        case None =>
+      }
+    }
+    ret
+  }
+
   def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = {
     val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
     topics.foreach { topic =>
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 115e203..e4d739a 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -61,7 +61,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
     val server4 = TestUtils.createServer(new KafkaConfig(configProps4))
 
     servers ++= List(server1, server2, server3, server4)
-    brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port))
+    brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port, s.config.rackId))
 
     // create topics with 1 partition, 2 replicas, one on each broker
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic1, Map(0->Seq(0,1)))
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 59de1b4..d8c7e86 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -24,6 +24,7 @@ import kafka.utils._
 import kafka.log._
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.{Logging, ZkUtils, TestUtils}
+import kafka.cluster.{Broker, Cluster}
 import kafka.common.{TopicExistsException, TopicAndPartition}
 import kafka.server.{KafkaServer, KafkaConfig}
 import java.io.File
@@ -37,12 +38,12 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
     // test 0 replication factor
     intercept[AdminOperationException] {
-      AdminUtils.assignReplicasToBrokers(brokerList, 10, 0)
+      AdminUtils.assignReplicasToBrokers(brokerList, ZkUtils.getFilteredCluster(zkClient, brokerList), 10, 0)
     }
 
     // test wrong replication factor
     intercept[AdminOperationException] {
-      AdminUtils.assignReplicasToBrokers(brokerList, 10, 6)
+      AdminUtils.assignReplicasToBrokers(brokerList, ZkUtils.getFilteredCluster(zkClient, brokerList), 10, 6)
     }
 
     // correct assignment
@@ -58,12 +59,74 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
         8 -> List(3, 0, 1),
         9 -> List(4, 1, 2))
 
-    val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
+    val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, ZkUtils.getFilteredCluster(zkClient, brokerList), 10, 3, 0)
     val e = (expectedAssignment.toList == actualAssignment.toList)
     assertTrue(expectedAssignment.toList == actualAssignment.toList)
   }
 
   @Test
+  def testRackReplicaAssignment() {
+    val brokerList = List(0, 1, 2, 3, 4)
+
+    // test not enough distinct rack-ids
+    intercept[AdminOperationException] {
+      AdminUtils.assignReplicasToBrokers(brokerList, new Cluster(List(
+      Broker(0, "localhost", 0, 0), 
+      Broker(1, "localhost", 0, 1), 
+      Broker(2, "localhost", 0, 0), 
+      Broker(3, "localhost", 0, 1), 
+      Broker(4, "localhost", 0, 0))), 10, 3, maxReplicaPerRack = 1)
+    }
+
+    { // correct assignment with max-rack-replication = 1
+      val expectedAssignment = Map(
+          0 -> List(0, 1, 2),
+          1 -> List(1, 2, 3),
+          2 -> List(2, 3, 4),
+          3 -> List(3, 4, 2),
+          4 -> List(4, 0, 2),
+          5 -> List(0, 2, 4),
+          6 -> List(1, 3, 2),
+          7 -> List(2, 4, 0),
+          8 -> List(3, 1, 2),
+          9 -> List(4, 2, 3))
+  
+      val filteredCluster = new Cluster(List(
+        Broker(0, "localhost", 0, 0), 
+        Broker(1, "localhost", 0, 1), 
+        Broker(2, "localhost", 0, 2), 
+        Broker(3, "localhost", 0, 0), 
+        Broker(4, "localhost", 0, 1)))
+      val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, filteredCluster, 10, 3, 0, maxReplicaPerRack = 1)
+      val e = (expectedAssignment.toList == actualAssignment.toList)
+      assertTrue(expectedAssignment.toList == actualAssignment.toList)
+    }
+    { // correct assignment with max-rack-replication = 2
+      val expectedAssignment = Map(
+          0 -> List(0, 1, 2),
+          1 -> List(1, 2, 3),
+          2 -> List(2, 3, 4),
+          3 -> List(3, 4, 0),
+          4 -> List(4, 0, 1),
+          5 -> List(0, 2, 3),
+          6 -> List(1, 3, 4),
+          7 -> List(2, 4, 0),
+          8 -> List(3, 0, 1),
+          9 -> List(4, 1, 2))
+  
+      val filteredCluster = new Cluster(List(
+        Broker(0, "localhost", 0, 0), 
+        Broker(1, "localhost", 0, 1), 
+        Broker(2, "localhost", 0, 2), 
+        Broker(3, "localhost", 0, 0), 
+        Broker(4, "localhost", 0, 1)))
+      val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, filteredCluster, 10, 3, 0, maxReplicaPerRack = 2)
+      val e = (expectedAssignment.toList == actualAssignment.toList)
+      assertTrue(expectedAssignment.toList == actualAssignment.toList)
+    }
+  }
+
+  @Test
   def testManualReplicaAssignment() {
     val brokers = List(0, 1, 2, 3, 4)
     TestUtils.createBrokersInZk(zkClient, brokers)
@@ -377,7 +440,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       // create a topic with a few config overrides and check that they are applied
       val maxMessageSize = 1024
       val retentionMs = 1000*1000
-      AdminUtils.createTopic(server.zkClient, topic, partitions, 1, makeConfig(maxMessageSize, retentionMs))
+      AdminUtils.createTopic(server.zkClient, topic, partitions, 1, topicConfig = makeConfig(maxMessageSize, retentionMs))
       checkConfig(maxMessageSize, retentionMs)
 
       // now double the config values for the topic and check that it is applied
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 8df0982..b411487 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -75,7 +75,7 @@ object SerializationTestUtils{
     TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100)
   )
 
-  private val brokers = List(new Broker(0, "localhost", 1011), new Broker(1, "localhost", 1012), new Broker(2, "localhost", 1013))
+  private val brokers = List(new Broker(0, "localhost", 1011, 0), new Broker(1, "localhost", 1012, 0), new Broker(2, "localhost", 1013, 0))
   private val partitionMetaData0 = new PartitionMetadata(0, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 0)
   private val partitionMetaData1 = new PartitionMetadata(1, Some(brokers.head), replicas = brokers, isr = brokers.tail, errorCode = 1)
   private val partitionMetaData2 = new PartitionMetadata(2, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 2)
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 9347ea6..57b9179 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -48,7 +48,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
   val group = "group1"
   val consumer0 = "consumer0"
   val consumedOffset = 5
-  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
+  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port, c.rackId)))
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
   val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
                                                            0,
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 47130d3..a5b9d72 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -40,7 +40,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
     yield new KafkaConfig(props)
   val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
   val topic = "topic"
-  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
+  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port, c.rackId)))
   val shutdown = ZookeeperConsumerConnector.shutdownCommand
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
   val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 9998a11..7650ba5 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -34,7 +34,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
   val configs = props.map(p => new KafkaConfig(p))
   private var server1: KafkaServer = null
-  val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port))
+  val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port, c.rackId))
 
   override def setUp() {
     super.setUp()
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 18e3555..210583f 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -167,8 +167,8 @@ class AsyncProducerTest extends JUnit3Suite {
 
     val props = new Properties()
     props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
-    val broker1 = new Broker(0, "localhost", 9092)
-    val broker2 = new Broker(1, "localhost", 9093)
+    val broker1 = new Broker(0, "localhost", 9092, 0)
+    val broker2 = new Broker(1, "localhost", 9093, 0)
     broker1
     // form expected partitions metadata
     val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2))
@@ -403,7 +403,7 @@ class AsyncProducerTest extends JUnit3Suite {
     val config = new ProducerConfig(props)
 
     val topic1 = "topic1"
-    val topic1Metadata = getTopicMetadata(topic1, Array(0, 1), 0, "localhost", 9092)
+    val topic1Metadata = getTopicMetadata(topic1, Array(0, 1), 0, "localhost", 9092, 0)
     val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
     topicPartitionInfos.put("topic1", topic1Metadata)
 
@@ -492,12 +492,12 @@ class AsyncProducerTest extends JUnit3Suite {
     producerDataList
   }
 
-  private def getTopicMetadata(topic: String, partition: Int, brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
-    getTopicMetadata(topic, List(partition), brokerId, brokerHost, brokerPort)
+  private def getTopicMetadata(topic: String, partition: Int, brokerId: Int, brokerHost: String, brokerPort: Int, rackId: Int = 0): TopicMetadata = {
+    getTopicMetadata(topic, List(partition), brokerId, brokerHost, brokerPort, rackId)
   }
 
-  private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
-    val broker1 = new Broker(brokerId, brokerHost, brokerPort)
+  private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int, rackId: Int): TopicMetadata = {
+    val broker1 = new Broker(brokerId, brokerHost, brokerPort, rackId)
     new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1))))
   }
   
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 38e3ae7..a2215cf 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -123,7 +123,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     // start another controller
     val controllerId = 2
     val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
-    val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
+    val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port, s.config.rackId))
     val controllerContext = new ControllerContext(zkClient, 6000)
     controllerContext.liveBrokers = brokers.toSet
     val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig)
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d88b6c3..98f9066 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -345,13 +345,13 @@ object TestUtils extends Logging {
   }
 
   def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
-    val brokers = ids.map(id => new Broker(id, "localhost", 6667))
-    brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port, 6000, jmxPort = -1))
+    val brokers = ids.map(id => new Broker(id, "localhost", 6667, 0))
+    brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port, b.rack, 6000, jmxPort = -1))
     brokers
   }
 
   def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
-    val brokers = ids.map(id => new Broker(id, "localhost", 6667))
+    val brokers = ids.map(id => new Broker(id, "localhost", 6667, 0))
     brokers.foreach(b => ZkUtils.deletePath(zkClient, ZkUtils.BrokerIdsPath + "/" + b))
     brokers
   }
diff --git a/examples/README b/examples/README
index d33f6c5..61de286 100644
--- a/examples/README
+++ b/examples/README
@@ -6,10 +6,10 @@ messages from Kafka server.
 In order to run demo from SBT:
    1. Start Zookeeper and the Kafka server
    2. ./sbt from top-level kafka directory
-   3. Switch to the kafka java examples project -> project Kafka Java Examples
+   3. Switch to the kafka java examples project -> project kafka-examples
    4. execute run -> run
-   5. For unlimited producer-consumer run, select option 1
-      For simple consumer demo, select option 2
+   5. For simple consumer demo, select option 1
+      For unlimited producer-consumer run, select option 2
 
 To run the demo using scripts: 
 
