From e16d4fa8db4307e0e54f51f81cfc4c348163f9b3 Mon Sep 17 00:00:00 2001
From: jmlvanre <joris.van.remoortere@gmail.com>
Date: Wed, 22 Jan 2014 18:13:05 -0800
Subject: [PATCH] Adding a rack-id to kafka config. This rack-id can be used
 during replica assignment by using the max-rack-replication
 argument in the admin scripts (create topic, etc.). By
 default the original replication assignment algorithm is
 used because max-rack-replication defaults to -1.
 max-rack-replication > -1 is not honored if you are doing
 manual replica assignment (preffered).

---
 .../scala/kafka/admin/AddPartitionsCommand.scala   | 12 +++--
 core/src/main/scala/kafka/admin/AdminUtils.scala   | 59 ++++++++++++++++++----
 .../scala/kafka/admin/CreateTopicCommand.scala     | 12 +++--
 .../kafka/admin/ReassignPartitionsCommand.scala    | 11 +++-
 core/src/main/scala/kafka/client/ClientUtils.scala |  5 +-
 core/src/main/scala/kafka/cluster/Broker.scala     | 19 ++++---
 core/src/main/scala/kafka/server/KafkaConfig.scala |  3 ++
 .../main/scala/kafka/server/KafkaZooKeeper.scala   |  2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |  6 +--
 .../scala/other/kafka/TestLogPerformance.scala     |  2 +-
 .../scala/unit/kafka/admin/AddPartitionsTest.scala | 15 ++++--
 .../test/scala/unit/kafka/admin/AdminTest.scala    |  9 ++--
 .../api/RequestResponseSerializationTest.scala     |  2 +-
 .../unit/kafka/consumer/ConsumerIteratorTest.scala |  2 +-
 .../kafka/integration/AutoOffsetResetTest.scala    |  2 +-
 .../scala/unit/kafka/integration/FetcherTest.scala |  2 +-
 .../kafka/integration/LazyInitProducerTest.scala   |  2 +-
 .../unit/kafka/integration/PrimitiveApiTest.scala  |  2 +-
 .../unit/kafka/integration/RollingBounceTest.scala | 13 +++--
 .../unit/kafka/integration/TopicMetadataTest.scala |  2 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |  8 +--
 .../test/scala/unit/kafka/log/LogOffsetTest.scala  |  5 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  2 +-
 .../unit/kafka/log4j/KafkaLog4jAppenderTest.scala  |  3 +-
 .../unit/kafka/producer/AsyncProducerTest.scala    | 14 ++---
 .../scala/unit/kafka/producer/ProducerTest.scala   | 10 ++--
 .../unit/kafka/server/LeaderElectionTest.scala     | 12 +++--
 .../unit/kafka/server/ServerShutdownTest.scala     |  2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 17 ++++---
 29 files changed, 168 insertions(+), 87 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
index 7f03708..b32f59b 100644
--- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
@@ -46,6 +46,11 @@ object AddPartitionsCommand extends Logging {
       "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...")
       .ofType(classOf[String])
       .defaultsTo("")
+    val rackReplicationOpt = parser.accepts("max-rack-replication", "maximum replicas assigned to a single rack")
+      .withRequiredArg
+      .describedAs("max # of replicas per rack")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(-1)
 
     val options = parser.parse(args : _*)
 
@@ -62,10 +67,11 @@ object AddPartitionsCommand extends Logging {
     val zkConnect = options.valueOf(zkConnectOpt)
     val nPartitions = options.valueOf(nPartitionsOpt).intValue
     val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
+    val rackReplication = options.valueOf(rackReplicationOpt).intValue
     var zkClient: ZkClient = null
     try {
       zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-      addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
+      addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr, rackReplication)
       println("adding partitions succeeded!")
     } catch {
       case e: Throwable =>
@@ -77,7 +83,7 @@ object AddPartitionsCommand extends Logging {
     }
   }
 
-  def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") {
+  def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "", rackReplication: Int = -1) {
     val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
     if (existingPartitionsReplicaList.size == 0)
       throw new AdministrationException("The topic %s does not exist".format(topic))
@@ -87,7 +93,7 @@ object AddPartitionsCommand extends Logging {
     // create the new partition replication list
     val brokerList = ZkUtils.getSortedBrokerList(zkClient)
     val newPartitionReplicaList = if (replicaAssignmentStr == "")
-      AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size)
+      AdminUtils.assignReplicasToBrokers(zkClient, brokerList, numPartitions, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size, rackReplication)
     else
       getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
 
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index d6ab275..a898345 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -47,8 +47,8 @@ object AdminUtils extends Logging {
    * p3        p4        p0        p1        p2       (3nd replica)
    * p7        p8        p9        p5        p6       (3nd replica)
    */
-  def assignReplicasToBrokers(brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int,
-                              fixedStartIndex: Int = -1, startPartitionId: Int = -1)
+  def assignReplicasToBrokers(zkClient: ZkClient, brokerList: Seq[Int], nPartitions: Int, replicationFactor: Int,
+                              fixedStartIndex: Int = -1, startPartitionId: Int = -1, maxReplicaPerRack: Int = -1)
   : Map[Int, Seq[Int]] = {
     if (nPartitions <= 0)
       throw new AdministrationException("number of partitions must be larger than 0")
@@ -62,15 +62,52 @@ object AdminUtils extends Logging {
     var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0
 
     var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
-    for (i <- 0 until nPartitions) {
-      if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
-        nextReplicaShift += 1
-      val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
-      var replicaList = List(brokerList(firstReplicaIndex))
-      for (j <- 0 until replicationFactor - 1)
-        replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
-      ret.put(currentPartitionId, replicaList.reverse)
-      currentPartitionId = currentPartitionId + 1
+    if (maxReplicaPerRack <= 0) {
+      for (i <- 0 until nPartitions) {
+        if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
+          nextReplicaShift += 1
+        val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
+        var replicaList = List(brokerList(firstReplicaIndex))
+        for (j <- 0 until replicationFactor - 1)
+          replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
+        ret.put(currentPartitionId, replicaList.reverse)
+        currentPartitionId = currentPartitionId + 1
+      }
+    } else {
+      val brokerToRackMap: Map[Int, Int] = brokerList.map(brokerId => (brokerId -> (ZkUtils.getBrokerInfo(zkClient, brokerId) match {
+        case Some(broker) => broker.rack
+        case None => throw new AdministrationException("broker " + brokerId + " must have rack id")
+      }) )).toMap
+      for (i <- 0 until nPartitions) {
+        if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
+          nextReplicaShift += 1
+        val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
+        var replicaList = List(brokerList(firstReplicaIndex))
+        var rackReplicaCount: mutable.Map[Int, Int] = mutable.Map(brokerToRackMap(brokerList(firstReplicaIndex)) -> 1)
+        var k = 0
+        for (j <- 0 until replicationFactor - 1) {
+          var done = false;
+          while (!done && k < brokerList.size) {
+            val broker = brokerList(getWrappedIndex(firstReplicaIndex, nextReplicaShift, k, brokerList.size))
+            val rack = brokerToRackMap(broker)
+            if (!(rackReplicaCount contains rack)) {
+              replicaList ::= broker
+              rackReplicaCount += (rack -> 1)
+              done = true;
+            } else if (rackReplicaCount(rack) < maxReplicaPerRack) {
+              rackReplicaCount(rack) = rackReplicaCount(rack) + 1
+              replicaList ::= broker
+              done = true;
+            }
+            k = k + 1
+          }
+          if (!done) {
+            throw new AdministrationException("not enough brokers available in unique racks to meet maxReplicaPerRack limit of " + maxReplicaPerRack)
+          }
+        }
+        ret.put(currentPartitionId, replicaList.reverse)
+        currentPartitionId = currentPartitionId + 1
+      }
     }
     ret.toMap
   }
diff --git a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
index 84c2095..ef5229b 100644
--- a/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
@@ -52,6 +52,11 @@ object CreateTopicCommand extends Logging {
                                         "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...")
                            .ofType(classOf[String])
                            .defaultsTo("")
+    val rackReplicationOpt = parser.accepts("max-rack-replication", "maximum replicas assigned to a single rack")
+                           .withRequiredArg
+                           .describedAs("max # of replicas per rack")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(-1)
 
     val options = parser.parse(args : _*)
 
@@ -68,10 +73,11 @@ object CreateTopicCommand extends Logging {
     val nPartitions = options.valueOf(nPartitionsOpt).intValue
     val replicationFactor = options.valueOf(replicationFactorOpt).intValue
     val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
+    val rackReplication = options.valueOf(rackReplicationOpt).intValue
     var zkClient: ZkClient = null
     try {
       zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-      createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
+      createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr, rackReplication)
       println("creation succeeded!")
     } catch {
       case e: Throwable =>
@@ -83,13 +89,13 @@ object CreateTopicCommand extends Logging {
     }
   }
 
-  def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
+  def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "", rackReplication: Int = -1) {
     Topic.validate(topic)
 
     val brokerList = ZkUtils.getSortedBrokerList(zkClient)
 
     val partitionReplicaAssignment = if (replicaAssignmentStr == "")
-      AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
+      AdminUtils.assignReplicasToBrokers(zkClient, brokerList, numPartitions, replicationFactor, -1, -1, rackReplication)
     else
       getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
     debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment))
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 2f706c9..d30faf8 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -64,6 +64,12 @@ object ReassignPartitionsCommand extends Logging {
       .describedAs("partition reassignment json file path")
       .ofType(classOf[String])
 
+    val rackReplicationOpt = parser.accepts("max-rack-replication", "maximum replicas assigned to a single rack")
+      .withRequiredArg
+      .describedAs("max # of replicas per rack")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(-1)
+
     val options = parser.parse(args : _*)
 
     for(arg <- List(zkConnectOpt)) {
@@ -110,11 +116,12 @@ object ReassignPartitionsCommand extends Logging {
         val brokerListToReassign = brokerList.split(',') map (_.toInt)
         val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
         val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
+        val rackReplication = options.valueOf(rackReplicationOpt).intValue
 
         val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
         groupedByTopic.foreach { topicInfo =>
-          val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size,
-            topicInfo._2.head._2.size)
+          val assignedReplicas = AdminUtils.assignReplicasToBrokers(zkClient, brokerListToReassign, topicInfo._2.size,
+            topicInfo._2.head._2.size, -1, -1, rackReplication)
           partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
         }
 
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 1d2f81b..17228a6 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -89,7 +89,7 @@ object ClientUtils extends Logging{
   }
 
   /**
-   * Parse a list of broker urls in the form host1:port1, host2:port2, ... 
+   * Parse a list of broker urls in the form host1:port1:rack1, host2:port2:rack2, ... 
    */
   def parseBrokerList(brokerListStr: String): Seq[Broker] = {
     val brokersStr = Utils.parseCsvList(brokerListStr)
@@ -100,7 +100,8 @@ object ClientUtils extends Logging{
       val brokerInfos = brokerStr.split(":")
       val hostName = brokerInfos(0)
       val port = brokerInfos(1).toInt
-      new Broker(brokerId, hostName, port)
+      val rack = brokerInfos(2).toInt
+      new Broker(brokerId, hostName, port, rack)
     })
   }
   
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 9407ed2..7c2d4a2 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -37,7 +37,8 @@ private[kafka] object Broker {
           val brokerInfo = m.asInstanceOf[Map[String, Any]]
           val host = brokerInfo.get("host").get.asInstanceOf[String]
           val port = brokerInfo.get("port").get.asInstanceOf[Int]
-          new Broker(id, host, port)
+          val rack = brokerInfo.get("rack").get.asInstanceOf[Int]
+          new Broker(id, host, port, rack)
         case None =>
           throw new BrokerNotAvailableException("Broker id %d does not exist".format(id))
       }
@@ -50,32 +51,34 @@ private[kafka] object Broker {
     val id = buffer.getInt
     val host = readShortString(buffer)
     val port = buffer.getInt
-    new Broker(id, host, port)
+    val rack = buffer.getInt
+    new Broker(id, host, port, rack)
   }
 }
 
-private[kafka] case class Broker(val id: Int, val host: String, val port: Int) {
+private[kafka] case class Broker(val id: Int, val host: String, val port: Int, val rack: Int) {
   
-  override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port)
+  override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port + ",rack:" + rack)
 
-  def getConnectionString(): String = host + ":" + port
+  def getConnectionString(): String = host + ":" + port + ":" + rack
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(id)
     writeShortString(buffer, host)
     buffer.putInt(port)
+    buffer.putInt(rack)
   }
 
-  def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/
+  def sizeInBytes: Int = shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/ + 4 /* rack id*/
 
   override def equals(obj: Any): Boolean = {
     obj match {
       case null => false
-      case n: Broker => id == n.id && host == n.host && port == n.port
+      case n: Broker => id == n.id && host == n.host && port == n.port && rack == n.rack
       case _ => false
     }
   }
   
-  override def hashCode(): Int = hashcode(id, host, port)
+  override def hashCode(): Int = hashcode(id, host, port, rack)
   
 }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 41c9626..3290541 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -37,6 +37,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the broker id for this server */
   val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue))
 
+  /* the rack id for this server */
+  val rackId: Int = props.getIntInRange("broker.rack", (0, Int.MaxValue))
+
   /* the maximum size of message that the server can receive */
   val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue))
   
diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
index 553640f..f96a548 100644
--- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
+++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
@@ -48,7 +48,7 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging {
       else
         config.hostName 
     val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
-    ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, config.zkSessionTimeoutMs, jmxPort)
+    ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, config.port, config.rackId, config.zkSessionTimeoutMs, jmxPort)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index c21bc60..a443882 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -183,14 +183,14 @@ object ZkUtils extends Logging {
     replicas.contains(brokerId.toString)
   }
 
-  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) {
+  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, rack: Int, timeout: Int, jmxPort: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val timestamp = "\"" + SystemTime.milliseconds.toString + "\""
     val brokerInfo =
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++
-                             Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "timestamp" -> timestamp),
+                             Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "rack" -> rack.toString, "timestamp" -> timestamp),
                                                    valueInQuotes = false))
-    val expectedBroker = new Broker(id, host, port)
+    val expectedBroker = new Broker(id, host, port, rack)
 
     try {
       createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, expectedBroker,
diff --git a/core/src/test/scala/other/kafka/TestLogPerformance.scala b/core/src/test/scala/other/kafka/TestLogPerformance.scala
index 9f3bb40..0443c97 100644
--- a/core/src/test/scala/other/kafka/TestLogPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestLogPerformance.scala
@@ -30,7 +30,7 @@ object TestLogPerformance {
     val messageSize = args(1).toInt
     val batchSize = args(2).toInt
     val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
-    val props = TestUtils.createBrokerConfig(0, -1)
+    val props = TestUtils.createBrokerConfig(0, -1, 1)
     val config = new KafkaConfig(props)
     val dir = TestUtils.tempDir()
     val log = new Log(dir, 50*1024*1024, config.messageMaxBytes, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime)
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 2436289..1ef1744 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -37,10 +37,15 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
   val port3 = TestUtils.choosePort()
   val port4 = TestUtils.choosePort()
 
-  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
-  val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3)
-  val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4)
+  val rack1 = 1
+  val rack2 = 2
+  val rack3 = 3
+  val rack4 = 4
+
+  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, rack1)
+  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, rack2)
+  val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, rack3)
+  val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, rack4)
 
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
   var brokers: Seq[Broker] = Seq.empty[Broker]
@@ -61,7 +66,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
     val server4 = TestUtils.createServer(new KafkaConfig(configProps4))
 
     servers ++= List(server1, server2, server3, server4)
-    brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port))
+    brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port, s.config.rackId))
 
     // create topics with 1 partition, 2 replicas, one on each broker
     CreateTopicCommand.createTopic(zkClient, topic1, 1, 2, "0:1")
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index a480881..71a228d 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -24,7 +24,6 @@ import kafka.server.KafkaConfig
 import kafka.utils.{Logging, ZkUtils, TestUtils}
 import kafka.common.{TopicExistsException, ErrorMapping, TopicAndPartition}
 
-
 class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
   @Test
@@ -33,7 +32,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
     // test 0 replication factor
     try {
-      AdminUtils.assignReplicasToBrokers(brokerList, 10, 0)
+      AdminUtils.assignReplicasToBrokers(zkClient, brokerList, 10, 0)
       fail("shouldn't allow replication factor 0")
     }
     catch {
@@ -43,7 +42,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
     // test wrong replication factor
     try {
-      AdminUtils.assignReplicasToBrokers(brokerList, 10, 6)
+      AdminUtils.assignReplicasToBrokers(zkClient, brokerList, 10, 6)
       fail("shouldn't allow replication factor larger than # of brokers")
     }
     catch {
@@ -66,7 +65,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
         9 -> List(4, 1, 2)
       )
 
-      val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
+      val actualAssignment = AdminUtils.assignReplicasToBrokers(zkClient, brokerList, 10, 3, 0)
       val e = (expectedAssignment.toList == actualAssignment.toList)
       assertTrue(expectedAssignment.toList == actualAssignment.toList)
     }
@@ -155,7 +154,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       11 -> 1
     )
     val topic = "test"
-    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
+    TestUtils.createBrokersInZk(zkClient, List((0, 0), (1, 1), (2, 2), (3, 3), (4, 4)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // create leaders for all partitions
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index f43ac8f..6ca5020 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -75,7 +75,7 @@ object SerializationTestUtils{
     TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100)
   )
 
-  private val brokers = List(new Broker(0, "localhost", 1011), new Broker(1, "localhost", 1012), new Broker(2, "localhost", 1013))
+  private val brokers = List(new Broker(0, "localhost", 1011, 1), new Broker(1, "localhost", 1012, 2), new Broker(2, "localhost", 1013, 3))
   private val partitionMetaData0 = new PartitionMetadata(0, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 0)
   private val partitionMetaData1 = new PartitionMetadata(1, Some(brokers.head), replicas = brokers, isr = brokers.tail, errorCode = 1)
   private val partitionMetaData2 = new PartitionMetadata(2, Some(brokers.head), replicas = brokers, isr = brokers, errorCode = 2)
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 1ee34b9..e283d7f 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -47,7 +47,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
   val group = "group1"
   val consumer0 = "consumer0"
   val consumedOffset = 5
-  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
+  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port, c.rackId)))
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
   val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
                                                            0,
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 2317760..fc6a380 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -33,7 +33,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
   val group = "default_group"
   val testConsumer = "consumer"
   val BrokerPort = 9892
-  val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, BrokerPort)))
+  val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, BrokerPort, 0)))
   val NumMessages = 10
   val LargeOffset = 10000
   val SmallOffset = -1
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index c5cddea..d2eeaaf 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -40,7 +40,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
     yield new KafkaConfig(props)
   val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
   val topic = "topic"
-  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
+  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port, c.rackId)))
   val shutdown = ZookeeperConsumerConnector.shutdownCommand
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
   val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
diff --git a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
index c3c7631..bf801a9 100644
--- a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
@@ -34,7 +34,7 @@ import org.junit.Assert.assertEquals
 class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness {
 
   val port = TestUtils.choosePort
-  val props = TestUtils.createBrokerConfig(0, port)
+  val props = TestUtils.createBrokerConfig(0, port, 0)
   val config = new KafkaConfig(props)
   val configs = List(config)
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index f764151..69ce626 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -37,7 +37,7 @@ import kafka.utils.{TestUtils, Utils}
 class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness {
 
   val port = TestUtils.choosePort
-  val props = TestUtils.createBrokerConfig(0, port)
+  val props = TestUtils.createBrokerConfig(0, port, 0)
   val config = new KafkaConfig(props)
   val configs = List(config)
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
index 26e9bd6..a9b826f 100644
--- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@ -39,14 +39,19 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
   val port3 = TestUtils.choosePort()
   val port4 = TestUtils.choosePort()
 
+  val rackId1 = 0
+  val rackId2 = 1
+  val rackId3 = 2
+  val rackId4 = 3
+
   val enableShutdown = true
-  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
+  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, rackId1)
   configProps1.put("controlled.shutdown.enable", "true")
-  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, rackId2)
   configProps2.put("controlled.shutdown.enable", "true")
-  val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3)
+  val configProps3 = TestUtils.createBrokerConfig(brokerId3, port3, rackId3)
   configProps3.put("controlled.shutdown.enable", "true")
-  val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4)
+  val configProps4 = TestUtils.createBrokerConfig(brokerId4, port4, rackId4)
   configProps4.put("controlled.shutdown.enable", "true")
   configProps4.put("controlled.shutdown.retry.backoff.ms", "100")
 
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index edf8555..4d406ed 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -34,7 +34,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
   val configs = props.map(p => new KafkaConfig(p))
   private var server1: KafkaServer = null
-  val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port))
+  val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port,c.rackId))
 
   override def setUp() {
     super.setUp()
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index ce893bf..89fb517 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -39,7 +39,7 @@ class LogManagerTest extends JUnit3Suite {
 
   override def setUp() {
     super.setUp()
-    config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) {
+    config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1, 0)) {
                    override val logSegmentBytes = 1024
                    override val logFlushIntervalMessages = 10000
                    override val logRetentionHours = maxLogAgeHours
@@ -111,7 +111,7 @@ class LogManagerTest extends JUnit3Suite {
     val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
     val retentionHours = 1
     val retentionMs = 1000 * 60 * 60 * retentionHours
-    val props = TestUtils.createBrokerConfig(0, -1)
+    val props = TestUtils.createBrokerConfig(0, -1, 0)
     logManager.shutdown()
     config = new KafkaConfig(props) {
       override val logSegmentBytes = (10 * (setSize - 1)) // each segment will be 10 messages
@@ -155,7 +155,7 @@ class LogManagerTest extends JUnit3Suite {
 
   @Test
   def testTimeBasedFlush() {
-    val props = TestUtils.createBrokerConfig(0, -1)
+    val props = TestUtils.createBrokerConfig(0, -1, 0)
     logManager.shutdown()
     config = new KafkaConfig(props) {
                    override val logSegmentBytes = 1024 *1024 *1024
@@ -179,7 +179,7 @@ class LogManagerTest extends JUnit3Suite {
   @Test
   def testLeastLoadedAssignment() {
     // create a log manager with multiple data directories
-    val props = TestUtils.createBrokerConfig(0, -1)
+    val props = TestUtils.createBrokerConfig(0, -1, 0)
     val dirs = Seq(TestUtils.tempDir().getAbsolutePath, 
                    TestUtils.tempDir().getAbsolutePath, 
                    TestUtils.tempDir().getAbsolutePath)
diff --git a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
index 1a9cc01..79153c5 100644
--- a/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
@@ -48,7 +48,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-    val config: Properties = createBrokerConfig(1, brokerPort)
+    val config: Properties = createBrokerConfig(1, brokerPort, 1)
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
     time = new MockTime()
@@ -196,9 +196,10 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(Seq(0L), consumerOffsets)
   }
 
-  private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
+  private def createBrokerConfig(nodeId: Int, port: Int, rack: Int): Properties = {
     val props = new Properties
     props.put("broker.id", nodeId.toString)
+    props.put("broker.rack", rack.toString)
     props.put("port", port.toString)
     props.put("log.dir", getLogDir.getAbsolutePath)
     props.put("log.flush.interval.messages", "1")
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index df90695..d240d7c 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -38,7 +38,7 @@ class LogTest extends JUnitSuite {
   @Before
   def setUp() {
     logDir = TestUtils.tempDir()
-    val props = TestUtils.createBrokerConfig(0, -1)
+    val props = TestUtils.createBrokerConfig(0, -1, 0)
     config = new KafkaConfig(props)
   }
 
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
index 67497dd..c703f8e 100644
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
@@ -43,6 +43,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
   val tLogger = Logger.getLogger(getClass())
 
   private val brokerZk = 0
+  private val rackZk = 0
 
   private val ports = TestUtils.choosePorts(2)
   private val portZk = ports(0)
@@ -51,7 +52,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
   override def setUp() {
     super.setUp()
 
-    val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk)
+    val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk, rackZk)
     val logDirZkPath = propsZk.getProperty("log.dir")
     logDirZk = new File(logDirZkPath)
     config = new KafkaConfig(propsZk)
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 69c88c7..484a54e 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -166,8 +166,8 @@ class AsyncProducerTest extends JUnit3Suite {
 
     val props = new Properties()
     props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(configs))
-    val broker1 = new Broker(0, "localhost", 9092)
-    val broker2 = new Broker(1, "localhost", 9093)
+    val broker1 = new Broker(0, "localhost", 9092, 0)
+    val broker2 = new Broker(1, "localhost", 9093, 1)
     broker1
     // form expected partitions metadata
     val partition1Metadata = new PartitionMetadata(0, Some(broker1), List(broker1, broker2))
@@ -401,7 +401,7 @@ class AsyncProducerTest extends JUnit3Suite {
     val config = new ProducerConfig(props)
 
     val topic1 = "topic1"
-    val topic1Metadata = getTopicMetadata(topic1, Array(0, 1), 0, "localhost", 9092)
+    val topic1Metadata = getTopicMetadata(topic1, Array(0, 1), 0, "localhost", 9092, 0)
     val topicPartitionInfos = new collection.mutable.HashMap[String, TopicMetadata]
     topicPartitionInfos.put("topic1", topic1Metadata)
 
@@ -488,12 +488,12 @@ class AsyncProducerTest extends JUnit3Suite {
     producerDataList
   }
 
-  private def getTopicMetadata(topic: String, partition: Int, brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
-    getTopicMetadata(topic, List(partition), brokerId, brokerHost, brokerPort)
+  private def getTopicMetadata(topic: String, partition: Int, brokerId: Int, brokerHost: String, brokerPort: Int, rackId: Int = -1): TopicMetadata = {
+    getTopicMetadata(topic, List(partition), brokerId, brokerHost, brokerPort, rackId)
   }
 
-  private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int): TopicMetadata = {
-    val broker1 = new Broker(brokerId, brokerHost, brokerPort)
+  private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost: String, brokerPort: Int, rackId: Int): TopicMetadata = {
+    val broker1 = new Broker(brokerId, brokerHost, brokerPort, rackId)
     new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1))))
   }
   
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 2cabfbb..4aa9daa 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -40,6 +40,8 @@ import org.junit.Assert.assertEquals
 class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
   private val brokerId1 = 0
   private val brokerId2 = 1
+  private val rackId1 = 0
+  private val rackId2 = 1
   private val ports = TestUtils.choosePorts(2)
   private val (port1, port2) = (ports(0), ports(1))
   private var server1: KafkaServer = null
@@ -49,12 +51,12 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
   private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
   private var servers = List.empty[KafkaServer]
 
-  private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+  private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, rackId1)
   private val config1 = new KafkaConfig(props1) {
     override val hostName = "localhost"
     override val numPartitions = 4
   }
-  private val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, rackId2)
   private val config2 = new KafkaConfig(props2) {
     override val hostName = "localhost"
     override val numPartitions = 4
@@ -99,7 +101,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
 
     val props1 = new util.Properties()
-    props1.put("metadata.broker.list", "localhost:80,localhost:81")
+    props1.put("metadata.broker.list", "localhost:80:0,localhost:81:1")
     props1.put("serializer.class", "kafka.serializer.StringEncoder")
     val producerConfig1 = new ProducerConfig(props1)
     val producer1 = new Producer[String, String](producerConfig1)
@@ -114,7 +116,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     }
 
     val props2 = new util.Properties()
-    props2.put("metadata.broker.list", "localhost:80," + TestUtils.getBrokerListStrFromConfigs(Seq( config1)))
+    props2.put("metadata.broker.list", "localhost:80:0," + TestUtils.getBrokerListStrFromConfigs(Seq( config1)))
     props2.put("serializer.class", "kafka.serializer.StringEncoder")
     val producerConfig2= new ProducerConfig(props2)
     val producer2 = new Producer[String, String](producerConfig2)
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 70e4b51..e0cff39 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -35,8 +35,11 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
   val port1 = TestUtils.choosePort()
   val port2 = TestUtils.choosePort()
 
-  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
-  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
+  val rackId1 = 0
+  val rackId2 = 1
+
+  val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1, rackId1)
+  val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2, rackId2)
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
   var staleControllerEpochDetected = false
@@ -122,8 +125,9 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     // start another controller
     val controllerId = 2
-    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
-    val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
+    val controllerRackId = 2
+    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort(), controllerRackId))
+    val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port, s.config.rackId))
     val controllerContext = new ControllerContext(zkClient, 6000)
     controllerContext.liveBrokers = brokers.toSet
     val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig)
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 947e795..05b5afc 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -32,7 +32,7 @@ import kafka.utils.{TestUtils, Utils}
 
 class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
   val port = TestUtils.choosePort
-  val props = TestUtils.createBrokerConfig(0, port)
+  val props = TestUtils.createBrokerConfig(0, port, 0)
   val config = new KafkaConfig(props)
 
   val host = "localhost"
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index a06cfff..34d3721 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -113,19 +113,20 @@ object TestUtils extends Logging {
    */
   def createBrokerConfigs(numConfigs: Int): List[Properties] = {
     for((port, node) <- choosePorts(numConfigs).zipWithIndex)
-    yield createBrokerConfig(node, port)
+    yield createBrokerConfig(node, port, node)
   }
 
   def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
-    configs.map(c => c.hostName + ":" + c.port).mkString(",")
+    configs.map(c => c.hostName + ":" + c.port + ":" + c.rackId).mkString(",")
   }
 
   /**
    * Create a test config for the given node id
    */
-  def createBrokerConfig(nodeId: Int, port: Int): Properties = {
+  def createBrokerConfig(nodeId: Int, port: Int, rack: Int): Properties = {
     val props = new Properties
     props.put("broker.id", nodeId.toString)
+    props.put("broker.rack", rack.toString)
     props.put("host.name", "localhost")
     props.put("port", port.toString)
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
@@ -344,14 +345,14 @@ object TestUtils extends Logging {
     }
   }
 
-  def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
-    val brokers = ids.map(id => new Broker(id, "localhost", 6667))
-    brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port, 6000, jmxPort = -1))
+  def createBrokersInZk(zkClient: ZkClient, ids: Seq[(Int, Int)]): Seq[Broker] = {
+    val brokers = ids.map(id => new Broker(id._1, "localhost", 6667, id._2))
+    brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.port, b.rack, 6000, jmxPort = -1))
     brokers
   }
 
-  def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
-    val brokers = ids.map(id => new Broker(id, "localhost", 6667))
+  def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[(Int, Int)]): Seq[Broker] = {
+    val brokers = ids.map(id => new Broker(id._1, "localhost", 6667, id._2))
     brokers.foreach(b => ZkUtils.deletePath(zkClient, ZkUtils.BrokerIdsPath + "/" + b))
     brokers
   }
-- 
1.8.0

