diff --git a/config/log4j.properties b/config/log4j.properties
index b76bc94..586afae 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -48,7 +48,7 @@ log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 #log4j.logger.kafka.perf=DEBUG, kafkaAppender
 #log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
 #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
-log4j.logger.kafka=INFO, kafkaAppender
+log4j.logger.kafka=INFO, stdout 
 
 log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender
 log4j.additivity.kafka.network.RequestChannel$=false
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index c7652ad..41cb764 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -18,16 +18,12 @@
 package kafka.admin
 
 import java.util.Random
-import kafka.api.{TopicMetadata, PartitionMetadata}
-import kafka.cluster.Broker
 import kafka.utils.{Logging, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection._
-import mutable.ListBuffer
 import scala.collection.mutable
 import kafka.common._
-import scala.Some
 
 object AdminUtils extends Logging {
   val rand = new Random
@@ -89,87 +85,6 @@ object AdminUtils extends Logging {
     }
   }
 
-  def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
-    fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])
-
-  def fetchTopicMetadataFromZk(topics: Set[String], zkClient: ZkClient): Set[TopicMetadata] = {
-    val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
-    topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo))
-  }
-
-  private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker]): TopicMetadata = {
-    if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
-      val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get
-      val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
-      val partitionMetadata = sortedPartitions.map { partitionMap =>
-        val partition = partitionMap._1
-        val replicas = partitionMap._2
-        val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
-        val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
-        debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
-
-        var leaderInfo: Option[Broker] = None
-        var replicaInfo: Seq[Broker] = Nil
-        var isrInfo: Seq[Broker] = Nil
-        try {
-          leaderInfo = leader match {
-            case Some(l) =>
-              try {
-                Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
-              } catch {
-                case e => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e)
-              }
-            case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
-          }
-          try {
-            replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
-            isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
-          } catch {
-            case e => throw new ReplicaNotAvailableException(e)
-          }
-          if(replicaInfo.size < replicas.size)
-            throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
-              replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
-          if(isrInfo.size < inSyncReplicas.size)
-            throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
-              inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
-          new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
-        } catch {
-          case e =>
-            debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
-            new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
-              ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-        }
-      }
-      new TopicMetadata(topic, partitionMetadata)
-    } else {
-      // topic doesn't exist, send appropriate error code
-      new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
-    }
-  }
-
-  private def getBrokerInfoFromCache(zkClient: ZkClient,
-                                     cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker],
-                                     brokerIds: Seq[Int]): Seq[Broker] = {
-    var failedBrokerIds: ListBuffer[Int] = new ListBuffer()
-    val brokerMetadata = brokerIds.map { id =>
-      val optionalBrokerInfo = cachedBrokerInfo.get(id)
-      optionalBrokerInfo match {
-        case Some(brokerInfo) => Some(brokerInfo) // return broker info from the cache
-        case None => // fetch it from zookeeper
-          ZkUtils.getBrokerInfo(zkClient, id) match {
-            case Some(brokerInfo) =>
-              cachedBrokerInfo += (id -> brokerInfo)
-              Some(brokerInfo)
-            case None =>
-              failedBrokerIds += id
-              None
-          }
-      }
-    }
-    brokerMetadata.filter(_.isDefined).map(_.get)
-  }
-
   private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
     val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
     (firstReplicaIndex + shift) % nBrokers
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 025d3ab..ebe4845 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -47,7 +47,7 @@ object ClientUtils extends Logging{
         producer.close()
       }
     }
-    if(!fetchMetaDataSucceeded){
+    if(!fetchMetaDataSucceeded) {
       throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t)
     } else {
       debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
@@ -62,13 +62,14 @@ object ClientUtils extends Logging{
    * @param clientId The client's identifier
    * @return topic metadata response
    */
-  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int): TopicMetadataResponse = {
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int,
+                         correlationId: Int = 0): TopicMetadataResponse = {
     val props = new Properties()
     props.put("metadata.broker.list", brokers.map(_.getConnectionString()).mkString(","))
     props.put("client.id", clientId)
     props.put("request.timeout.ms", timeoutMs.toString)
     val producerConfig = new ProducerConfig(props)
-    fetchTopicMetadata(topics, brokers, producerConfig, 0)
+    fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/common/ControllerNotAvailableException.scala b/core/src/main/scala/kafka/common/ControllerNotAvailableException.scala
new file mode 100644
index 0000000..7335e9c
--- /dev/null
+++ b/core/src/main/scala/kafka/common/ControllerNotAvailableException.scala
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Broker sends this error code to the client when it receives a metadata request but no controller is available
+ * to serve the metadata
+ */
+class ControllerNotAvailableException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+  def this(message: String) = this(message, null)
+  def this() = this(null, null)
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index c8769e0..233930c 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -41,6 +41,7 @@ object ErrorMapping {
   val ReplicaNotAvailableCode: Short = 9
   val MessageSizeTooLargeCode: Short = 10
   val StaleControllerEpochCode: Short = 11
+  val ControllerNotAvailable: Short = 12
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -54,7 +55,8 @@ object ErrorMapping {
       classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode,
       classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
       classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode,
-      classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode
+      classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode,
+      classOf[ControllerNotAvailableException].asInstanceOf[Class[Throwable]] -> ControllerNotAvailable
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index c6250dc..b4a257f 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -26,8 +26,9 @@ import scala.collection.mutable
 import java.util.concurrent.locks.ReentrantLock
 import kafka.utils.ZkUtils._
 import kafka.utils.{ShutdownableThread, SystemTime}
-import kafka.common.TopicAndPartition
+import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.client.ClientUtils
+import java.util.concurrent.atomic.AtomicInteger
 
 /**
  *  Usage:
@@ -44,6 +45,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
   private val lock = new ReentrantLock
   private val cond = lock.newCondition()
   private var leaderFinderThread: ShutdownableThread = null
+  private val correlationId = new AtomicInteger(0)
 
   private class LeaderFinderThread(name: String) extends ShutdownableThread(name) {
     // thread responsible for adding the fetcher to the right broker when leader is available
@@ -61,22 +63,37 @@ class ConsumerFetcherManager(private val consumerIdString: String,
           val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
                                                               brokers,
                                                               config.clientId,
-                                                              config.socketTimeoutMs).topicsMetadata
-          val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
-          topicsMetadata.foreach(
-            tmd => {
-              val topic = tmd.topic
-              tmd.partitionsMetadata.foreach(
-              pmd => {
-                val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
-                if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) {
-                  val leaderBroker = pmd.leader.get
-                  leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
+                                                              config.socketTimeoutMs,
+                                                              correlationId.getAndIncrement).topicsMetadata
+          topicsMetadata.foreach { topicMetadata =>
+            topicMetadata.errorCode match {
+              case ErrorMapping.NoError =>
+                topicMetadata.partitionsMetadata.foreach { partitionMetadata =>
+                  partitionMetadata.errorCode match {
+                    case ErrorMapping.NoError => debug("Leader for partition [%s,%d] is %d".format(topicMetadata.topic,
+                      partitionMetadata.partitionId, partitionMetadata.leader.get.id))
+                    case _ => debug("Leader for partition [%s,%d] is not available due to %s".format(topicMetadata.topic,
+                      partitionMetadata.partitionId, ErrorMapping.exceptionFor(partitionMetadata.errorCode).getClass.getName))
+                  }
                 }
-              })
-            })
+              case _ =>
+                debug("No partition metadata for topic %s due to %s".format(topicMetadata.topic,
+                  ErrorMapping.exceptionFor(topicMetadata.errorCode).getClass.getName))
+            }
+          }
+          val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
+          topicsMetadata.foreach { tmd =>
+            val topic = tmd.topic
+            tmd.partitionsMetadata.foreach { pmd =>
+              val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
+              if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) {
+                val leaderBroker = pmd.leader.get
+                leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
+              }
+            }
+          }
 
-          leaderForPartitionsMap.foreach{
+          leaderForPartitionsMap.foreach {
             case(topicAndPartition, leaderBroker) =>
               val pti = partitionMap(topicAndPartition)
               try {
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 5f9c902..1270e92 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -20,9 +20,8 @@ package kafka.consumer
 import kafka.cluster.Broker
 import kafka.server.AbstractFetcherThread
 import kafka.message.ByteBufferMessageSet
-import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, FetchResponsePartitionData}
+import kafka.api.{Request, OffsetRequest, FetchResponsePartitionData}
 import kafka.common.TopicAndPartition
-import kafka.common.ErrorMapping
 
 
 class ConsumerFetcherThread(name: String,
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 1fbdfc3..bdeee91 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -20,11 +20,7 @@ package kafka.consumer
 import kafka.api._
 import kafka.network._
 import kafka.utils._
-import kafka.utils.ZkUtils._
-import collection.immutable
-import kafka.common.{ErrorMapping, TopicAndPartition, KafkaException}
-import org.I0Itec.zkclient.ZkClient
-import kafka.cluster.Broker
+import kafka.common.{ErrorMapping, TopicAndPartition}
 
 /**
  * A consumer of kafka messages
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 398618f..e66680b 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -303,6 +303,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   class ZKRebalancerListener(val group: String, val consumerIdString: String,
                              val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
     extends IZkChildListener {
+    private val correlationId = new AtomicInteger(0)
     private var isWatcherTriggered = false
     private val lock = new ReentrantLock
     private val cond = lock.newCondition()
@@ -403,9 +404,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
                                                           brokers,
                                                           config.clientId,
-                                                          config.socketTimeoutMs).topicsMetadata
+                                                          config.socketTimeoutMs,
+                                                          correlationId.getAndIncrement).topicsMetadata
       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
-      topicsMetadata.foreach(m =>{
+      topicsMetadata.foreach(m => {
         val topic = m.topic
         val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
         partitionsPerTopicMap.put(topic, partitions)
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 3164f78..b9db25d 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -31,6 +31,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
   private val brokerLock = new Object
   this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
 
+  info("Controller %d trying to connect to live brokers %s on startup".format(config.brokerId,controllerContext.liveBrokerIds.mkString(",")))
   controllerContext.liveBrokers.foreach(addNewBroker(_))
 
   def startup() = {
@@ -75,6 +76,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
 
   private def addNewBroker(broker: Broker) {
     val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize)
+    trace("Controller %d trying to connect to broker %d".format(config.brokerId,broker.id))
     val channel = new BlockingChannel(broker.host, broker.port,
       BlockingChannel.UseDefaultBufferSize,
       BlockingChannel.UseDefaultBufferSize,
@@ -218,6 +220,12 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
       }
     }
   }
+
+  def sendEmptyLeaderAndIsrRequestToNewBrokers(controllerEpoch: Int, correlationId: Int, brokers: Set[Int]) {
+    val leaderAndIsrRequest = new LeaderAndIsrRequest(Map.empty, Set.empty[Broker], controllerId,
+      controllerEpoch, correlationId, clientId)
+    brokers.foreach(brokerId => sendRequest(brokerId, leaderAndIsrRequest, null))
+  }
 }
 
 case class ControllerBrokerStateInfo(channel: BlockingChannel,
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 02510bd..eae83ea 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -32,13 +32,14 @@ import kafka.utils.{Utils, ZkUtils, Logging}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
+import java.util.concurrent.atomic.AtomicInteger
 import scala.Some
 import kafka.common.TopicAndPartition
-import java.util.concurrent.atomic.AtomicInteger
 
 class ControllerContext(val zkClient: ZkClient,
                         var controllerChannelManager: ControllerChannelManager = null,
                         val controllerLock: Object = new Object,
+                        val allBrokers: mutable.Map[Int, Broker] = mutable.Map.empty,
                         var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
                         val brokerShutdownLock: Object = new Object,
                         var epoch: Int = KafkaController.InitialControllerEpoch - 1,
@@ -275,9 +276,33 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     info("New broker startup callback for %s".format(newBrokers.mkString(",")))
 
     val newBrokersSet = newBrokers.toSet
+    var replicasOnNewBrokers = new mutable.HashSet[PartitionAndReplica]()
+    // send an empty leader and isr request to newly restarted brokers who don't have any partitions assigned yet.
+    // This enables the broker to handle metadata requests
+    val newBrokersWithNoReplicas = newBrokers.filter { broker =>
+      // find all the replicas that this broker is supposed to host
+      val replicasOnThisBroker = getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, Seq(broker))
+      replicasOnNewBrokers ++= replicasOnThisBroker
+
+      replicasOnThisBroker.size match {
+        case 0 => true
+        case _ => false
+      }
+    }.toSet
+    if(newBrokersWithNoReplicas.size > 0) {
+      info("Newly started brokers %s do not have any partitions. Sending them empty leader and isr request"
+        .format(newBrokers.mkString(",")))
+      // if there are no replicas on the new brokers, send them an empty leader and isr request notifying them of the
+      // controller broker. This allows them to serve metadata requests
+      val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest, config.brokerId, clientId)
+      brokerRequestBatch.sendEmptyLeaderAndIsrRequestToNewBrokers(epoch, controllerContext.correlationId.getAndIncrement,
+        newBrokersWithNoReplicas)
+    }
+
     // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
     // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
-    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers), OnlineReplica)
+    replicaStateMachine.handleStateChanges(replicasOnNewBrokers, OnlineReplica)
+
     // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
     // to see if these brokers can become leaders for some/all of those
     partitionStateMachine.triggerOnlinePartitionStateChange()
@@ -475,6 +500,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
 
   private def initializeControllerContext() {
     controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
+    controllerContext.allBrokers ++= controllerContext.liveBrokers.map(b => (b.id -> b))
     controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
     controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
     controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
@@ -673,7 +699,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    */
   def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderIsrAndControllerEpoch] = {
     val topicAndPartition = TopicAndPartition(topic, partition)
-    debug("Removing replica %d from ISR of %s.".format(replicaId, topicAndPartition))
+    debug("Removing replica %d from ISR %s for partition %s.".format(replicaId,
+      controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr, topicAndPartition))
     var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
     var zkWriteCompleteOrUnnecessary = false
     while (!zkWriteCompleteOrUnnecessary) {
@@ -701,6 +728,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
             newLeaderAndIsr.zkVersion = newVersion
 
             finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch))
+            controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
             if (updateSucceeded)
               info("New leader and ISR for partition %s is %s".format(topicAndPartition, newLeaderAndIsr.toString()))
             updateSucceeded
@@ -708,6 +736,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
             warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s"
                  .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr))
             finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch))
+            controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
             true
           }
         case None =>
@@ -718,6 +747,57 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     finalLeaderIsrAndControllerEpoch
   }
 
+  def getTopicMetadata(topics: Set[String]): Set[TopicMetadata] = {
+    topics.map { topic =>
+      if(controllerContext.allTopics.contains(topic)) {
+        val partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => p._1.topic.equals(topic))
+        val sortedPartitions = partitionReplicaAssignment.toList.sortWith((m1,m2) => m1._1.partition < m2._1.partition)
+        val partitionMetadata = sortedPartitions.map { partitionReplicaMap =>
+          val partition = partitionReplicaMap._1.partition
+          val topicAndPartition = TopicAndPartition(topic, partition)
+          val replicas = partitionReplicaMap._2
+          var replicaInfo: Seq[Broker] = replicas.map(controllerContext.allBrokers.getOrElse(_, null)).filter(_ != null)
+          val leaderAndIsrOpt = controllerContext.partitionLeadershipInfo.get(topicAndPartition)
+          var leaderInfo: Option[Broker] = None
+          var isrInfo: Seq[Broker] = Nil
+          leaderAndIsrOpt match {
+            case Some(leaderIsrAndEpoch) =>
+              val leader = leaderIsrAndEpoch.leaderAndIsr.leader
+              val isr = leaderIsrAndEpoch.leaderAndIsr.isr
+              debug("[%s,%d]".format(topic,partition) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader)
+              try {
+                if(controllerContext.liveBrokerIds.contains(leader))
+                  leaderInfo = Some(controllerContext.allBrokers(leader))
+                else throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition))
+                isrInfo = isr.map(controllerContext.allBrokers.getOrElse(_, null)).filter(_ != null)
+                if(replicaInfo.size < replicas.size)
+                  throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
+                    replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
+                if(isrInfo.size < isr.size)
+                  throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
+                    isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
+                new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
+              } catch {
+                case e =>
+                  error("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
+                  new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
+                    ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+              }
+            case None => // it is possible that for a newly created topic/partition, its replicas are assigned, but a
+              // leader hasn't been assigned yet
+              debug("[%s,%d]".format(topic,partition) + ";replicas = " + replicas + ", in sync replicas = None, leader = None")
+              new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.LeaderNotAvailableCode)
+          }
+        }
+        new TopicMetadata(topic, partitionMetadata)
+      } else {
+        // topic doesn't exist, send appropriate error code
+        new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
+      }
+    }
+  }
+
+
   class SessionExpirationListener() extends IZkStateListener with Logging {
     this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
     @throws(classOf[Exception])
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index c017727..76f1eba 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -233,7 +233,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * @param topicAndPartition   The topic/partition whose leader and isr path is to be initialized
    */
   private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
-    debug("Initializing leader and isr for partition %s".format(topicAndPartition))
     val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
     val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
     liveAssignedReplicas.size match {
@@ -249,6 +248,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         val leader = liveAssignedReplicas.head
         val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),
           controller.epoch)
+        debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch))
         try {
           ZkUtils.createPersistentPath(controllerContext.zkClient,
             ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index bea1644..5b66c1b 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -245,7 +245,9 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
             try {
               val curBrokerIds = currentBrokerList.map(_.toInt).toSet
               val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
-              val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+              val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _))
+              val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
+              controllerContext.allBrokers ++= newBrokers.map(b => (b.id -> b))
               val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
               controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
               info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 7b8d1f0..1437496 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -49,7 +49,7 @@ object RequestChannel extends Logging {
     val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
     buffer = null
     private val requestLogger = Logger.getLogger("kafka.request.logger")
-    trace("Received request : %s".format(requestObj))
+    trace("Processor %d received request : %s".format(processor, requestObj))
 
     def updateRequestMetrics() {
       val endTimeMs = SystemTime.milliseconds
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 5a44c28..d5bd143 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -61,20 +61,19 @@ class SocketServer(val brokerId: Int,
     this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize)
     Utils.newThread("kafka-acceptor", acceptor, false).start()
     acceptor.awaitStartup
-    info("started")
+    info("Started")
   }
 
   /**
    * Shutdown the socket server
    */
   def shutdown() = {
-    info("shutting down")
+    info("Shutting down")
     if(acceptor != null)
       acceptor.shutdown()
     for(processor <- processors)
       processor.shutdown()
-    requestChannel.shutdown
-    info("shut down completely")
+    info("Shutdown completed")
   }
 }
 
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 82e6e4d..9fd6649 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -22,12 +22,14 @@ import kafka.common.KafkaException
 import kafka.utils.Logging
 import kafka.common.ErrorMapping
 import kafka.client.ClientUtils
+import org.apache.log4j.Logger
 
 
 class BrokerPartitionInfo(producerConfig: ProducerConfig,
                           producerPool: ProducerPool,
                           topicPartitionInfo: HashMap[String, TopicMetadata])
         extends Logging {
+  val metadataRequestLogger = Logger.getLogger("metadata.request.logger")
   val brokerList = producerConfig.brokerList
   val brokers = ClientUtils.parseBrokerList(brokerList)
 
@@ -54,6 +56,13 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
           }
       }
     val partitionMetadata = metadata.partitionsMetadata
+    if(partitionMetadata.size == 0) {
+      if(metadata.errorCode != ErrorMapping.NoError) {
+        throw new KafkaException(ErrorMapping.exceptionFor(metadata.errorCode))
+      } else {
+        throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata))
+      }
+    }
     partitionMetadata.map { m =>
       m.leader match {
         case Some(leader) =>
@@ -76,14 +85,15 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
     topicsMetadata = topicMetadataResponse.topicsMetadata
     // throw partition specific exception
     topicsMetadata.foreach(tmd =>{
-      trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
-      if(tmd.errorCode == ErrorMapping.NoError){
+      if(metadataRequestLogger.isTraceEnabled)
+        metadataRequestLogger.trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
+      if(tmd.errorCode == ErrorMapping.NoError) {
         topicPartitionInfo.put(tmd.topic, tmd)
       } else
-        warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
+        metadataRequestLogger.warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
       tmd.partitionsMetadata.foreach(pmd =>{
         if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
-          warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
+          metadataRequestLogger.warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
             ErrorMapping.exceptionFor(pmd.errorCode).getClass))
         } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
       })
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 89cb27d..1a74951 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -52,7 +52,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
   def handle(events: Seq[KeyedMessage[K,V]]) {
     lock synchronized {
       val serializedData = serialize(events)
-      serializedData.foreach{
+      serializedData.foreach {
         keyed =>
           val dataSize = keyed.message.payloadSize
           producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
@@ -61,6 +61,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       var outstandingProduceRequests = serializedData
       var remainingRetries = config.messageSendMaxRetries + 1
       val correlationIdStart = correlationId.get()
+      debug("Handling %d events".format(events.size))
       while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
         topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
         if (topicMetadataRefreshInterval >= 0 &&
@@ -70,7 +71,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           lastTopicMetadataRefreshTime = SystemTime.milliseconds
         }
         outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
-        if (outstandingProduceRequests.size > 0)  {
+        if (outstandingProduceRequests.size > 0) {
+          info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
           // back off and update the topic metadata cache before attempting another send operation
           Thread.sleep(config.retryBackoffMs)
           // get topics of the outstanding produce requests and refresh metadata for those
@@ -177,7 +179,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     }catch {    // Swallow recoverable exceptions and return None so that they can be retried.
       case ute: UnknownTopicException => warn("Failed to collate messages by topic,partition due to", ute); None
       case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to", lnae); None
-      case oe => error("Failed to collate messages by topic, partition due to", oe); throw oe
+      case oe => error("Failed to collate messages by topic, partition due to", oe); None
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f5288bf..3683403 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,11 +17,10 @@
 
 package kafka.server
 
-import kafka.admin.{CreateTopicCommand, AdminUtils}
+import kafka.admin.CreateTopicCommand
 import kafka.api._
 import kafka.message._
 import kafka.network._
-import org.apache.log4j.Logger
 import scala.collection._
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic._
@@ -30,12 +29,16 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.common._
 import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
 import kafka.network.RequestChannel.Response
+import kafka.controller.KafkaController
+import kafka.cluster.Broker
+import kafka.client.ClientUtils
 
 
 /**
  * Logic to handle the various Kafka requests
  */
-class KafkaApis(val requestChannel: RequestChannel,
+class KafkaApis(val controller: KafkaController,
+                val requestChannel: RequestChannel,
                 val replicaManager: ReplicaManager,
                 val zkClient: ZkClient,
                 brokerId: Int) extends Logging {
@@ -45,7 +48,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   private val fetchRequestPurgatory =
     new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests)
   private val delayedRequestMetrics = new DelayedRequestMetrics
-
+  private var controllerBroker: Broker = null
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
 
   /**
@@ -73,6 +76,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
     val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
+    controllerBroker = ZkUtils.getBrokerInfo(zkClient, leaderAndIsrRequest.controllerId).getOrElse(null)
     try {
       val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
       val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error)
@@ -87,6 +91,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleStopReplicaRequest(request: RequestChannel.Request) {
     val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
+    controllerBroker = ZkUtils.getBrokerInfo(zkClient, stopReplicaRequest.controllerId).getOrElse(null)
     val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
     val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
@@ -390,48 +395,69 @@ class KafkaApis(val requestChannel: RequestChannel,
     val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
     val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
     val config = replicaManager.config
-    val uniqueTopics = {
-      if(metadataRequest.topics.size > 0)
-        metadataRequest.topics.toSet
-      else
-        ZkUtils.getAllTopics(zkClient).toSet
-    }
-    val topicMetadataList = AdminUtils.fetchTopicMetadataFromZk(uniqueTopics, zkClient)
-    topicMetadataList.foreach(
-      topicAndMetadata => {
-        topicAndMetadata.errorCode match {
-          case ErrorMapping.NoError => topicsMetadata += topicAndMetadata
+    var uniqueTopics = Set.empty[String]
+    try {
+      uniqueTopics = {
+        if(metadataRequest.topics.size > 0)
+          metadataRequest.topics.toSet
+        else
+          ZkUtils.getAllTopics(zkClient).toSet
+      }
+      val topicMetadataList =
+      // careful! the following call can be blocking since it tries to acquire the controller lock
+        if(controller.controllerContext.controllerChannelManager != null) {
+          debug("Serving topic metadata request with correlation id %d from client %s locally, since broker %d is the controller"
+            .format(metadataRequest.correlationId, metadataRequest.clientId, config.brokerId))
+          // if broker is the controller, serve it from the cache
+          controller.getTopicMetadata(immutable.Set.empty[String] ++ uniqueTopics).toSeq
+        } else {
+          // if broker is not controller, forward metadata request to the controller
+          // find the broker host/port of the controller
+          if(controllerBroker != null) {
+            debug("Forwarding topic metadata request with correlation id %d from client %s to controller %s"
+              .format(metadataRequest.correlationId, metadataRequest.clientId, controllerBroker))
+            ClientUtils.fetchTopicMetadata(uniqueTopics, Seq(controllerBroker), "broker-"+ config.brokerId,
+              config.metadataRequestTimeoutMs, metadataRequest.correlationId).topicsMetadata
+          } else {
+            debug("Returning empty response to topic metadata request with correlation id %d from client %s since controller is not available"
+              .format(metadataRequest.correlationId, metadataRequest.clientId))
+            // broker has just started and doesn't know who the controller is yet
+            uniqueTopics.map { topic =>
+              new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.ControllerNotAvailable)
+            }.toSeq
+          }
+        }
+      // handle auto create topics
+      topicMetadataList.foreach { topicMetadata =>
+        topicMetadata.errorCode match {
+          case ErrorMapping.NoError => topicsMetadata += topicMetadata
           case ErrorMapping.UnknownTopicOrPartitionCode =>
-            try {
-              /* check if auto creation of topics is turned on */
-              if (config.autoCreateTopicsEnable) {
-                try {
-                  CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
-                  info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-                               .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
-                } catch {
-                  case e: TopicExistsException => // let it go, possibly another broker created this topic
-                }
-                val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicAndMetadata.topic, zkClient)
-                topicsMetadata += newTopicMetadata
-                newTopicMetadata.errorCode match {
-                  case ErrorMapping.NoError =>
-                  case _ => throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topicAndMetadata.topic))
-                }
+            if (config.autoCreateTopicsEnable) {
+              try {
+                CreateTopicCommand.createTopic(zkClient, topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
+                info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+                  .format(topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
+              } catch {
+                case e: TopicExistsException => // let it go, possibly another broker created this topic
               }
-            } catch {
-              case e => error("Error while retrieving topic metadata", e)
+              topicsMetadata += new TopicMetadata(topicMetadata.topic, topicMetadata.partitionsMetadata, ErrorMapping.LeaderNotAvailableCode)
+            } else {
+              topicsMetadata += topicMetadata
             }
-          case _ => 
-            error("Error while fetching topic metadata for topic " + topicAndMetadata.topic,
-                  ErrorMapping.exceptionFor(topicAndMetadata.errorCode).getCause)
-            topicsMetadata += topicAndMetadata
+          case _ =>
+            error("Error while fetching topic metadata for topic %s due to %s ".format(topicMetadata.topic,
+              ErrorMapping.exceptionFor(topicMetadata.errorCode).getClass.getName))
+            topicsMetadata += topicMetadata
         }
-      })
-    trace("Sending topic metadata for correlation id %d to client %s".format(metadataRequest.correlationId, metadataRequest.clientId))
-    topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
-    val response = new TopicMetadataResponse(topicsMetadata.toSeq, metadataRequest.correlationId)
-    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+      }
+    } catch {
+      case e =>
+        error("Failed to serve TopicMetadataRequest %s due to".format(metadataRequest), e)
+    } finally {
+      trace("Sending topic metadata %s for correlation id %d to client %s".format(topicsMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
+      val response = new TopicMetadataResponse(topicsMetadata.toSeq, metadataRequest.correlationId)
+      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+    }
   }
 
   def close() {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 549b4b0..ecacf96 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -169,4 +169,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the purge interval (in number of requests) of the producer request purgatory */
   val producerPurgatoryPurgeIntervalRequests = props.getInt("producer.purgatory.purge.interval.requests", 10000)
 
- }
+  /* the timeout that the broker uses to fetch metadata from the controller. This is required to be low in unit tests
+  since if the controller broker is shut down, it will not close the socket to the broker. In this case, the broker must
+  use some timeout. This can be essentially disabled in a real production environment since if a broker dies, the process
+  exits and closes all sockets associated with the process. */
+  val metadataRequestTimeoutMs = props.getInt("metadata.request.timeout.ms", Integer.MAX_VALUE)
+}
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 842dcf3..d4274a7 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -33,14 +33,15 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha
       try {
         val req = requestChannel.receiveRequest()
         if(req eq RequestChannel.AllDone) {
-          trace("receives shut down command, shut down".format(brokerId, id))
+          debug("Kafka request handler %d on broker %d received shut down command".format(
+            id, brokerId))
           return
         }
         req.dequeueTimeMs = SystemTime.milliseconds
-        debug("handles request " + req)
+        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
         apis.handle(req)
       } catch {
-        case e: Throwable => error("exception when handling request", e)
+        case e: Throwable => error("Exception when handling request")
       }
     }
   }
@@ -55,12 +56,17 @@ class KafkaRequestHandlerPool(val brokerId: Int,
   this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
   val threads = new Array[Thread](numThreads)
   val runnables = new Array[KafkaRequestHandler](numThreads)
-  for(i <- 0 until numThreads) { 
-    runnables(i) = new KafkaRequestHandler(i, brokerId, requestChannel, apis)
-    threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
-    threads(i).start()
+
+  def startup() {
+    info("Starting up")
+    for(i <- 0 until numThreads) {
+      runnables(i) = new KafkaRequestHandler(i, brokerId, requestChannel, apis)
+      threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
+      threads(i).start()
+    }
+    info("Startup complete")
   }
-  
+
   def shutdown() {
     info("shutting down")
     for(handler <- runnables)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index b4a57c6..543a212 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -82,8 +82,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager)
 
     kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
-    apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId)
+    apis = new KafkaApis(kafkaController, socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId)
     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
+    requestHandlerPool.startup()
     Mx4jLoader.maybeLoad
 
     // start the replica manager
@@ -112,6 +113,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     info("shutting down")
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
+      if(kafkaZookeeper != null)
+        Utils.swallow(kafkaZookeeper.shutdown())
       if(socketServer != null)
         Utils.swallow(socketServer.shutdown())
       if(requestHandlerPool != null)
@@ -119,8 +122,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
       Utils.swallow(kafkaScheduler.shutdown())
       if(apis != null)
         Utils.swallow(apis.close())
-      if(kafkaZookeeper != null)
-        Utils.swallow(kafkaZookeeper.shutdown())
       if(replicaManager != null)
         Utils.swallow(replicaManager.shutdown())
       if(logManager != null)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 03f621a..a84da13 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -19,8 +19,8 @@ package kafka.server
 
 import kafka.cluster.Broker
 import kafka.message.ByteBufferMessageSet
-import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData}
-import kafka.common.{KafkaStorageException, TopicAndPartition, ErrorMapping}
+import kafka.api.{OffsetRequest, FetchResponsePartitionData}
+import kafka.common.{KafkaStorageException, TopicAndPartition}
 
 class ReplicaFetcherThread(name:String,
                            sourceBroker: Broker,
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index b0a0e09..250d07b 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -160,8 +160,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // create leaders for all partitions
     TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-    val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas)
-    val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id).toList).toList
+    val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap
     assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
     for(i <- 0 until actualReplicaList.size)
       assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
@@ -176,39 +175,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
   }
 
   @Test
-  def testGetTopicMetadata() {
-    val expectedReplicaAssignment = Map(
-      0 -> List(0, 1, 2),
-      1 -> List(1, 2, 3)
-    )
-    val leaderForPartitionMap = Map(
-      0 -> 0,
-      1 -> 1
-    )
-    val topic = "auto-topic"
-    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
-    // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-
-    val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-    newTopicMetadata.errorCode match {
-      case ErrorMapping.UnknownTopicOrPartitionCode =>
-        fail("Topic " + topic + " should've been automatically created")
-      case _ =>
-        assertEquals(topic, newTopicMetadata.topic)
-        assertNotNull("partition metadata list cannot be null", newTopicMetadata.partitionsMetadata)
-        assertEquals("partition metadata list length should be 2", 2, newTopicMetadata.partitionsMetadata.size)
-        val actualReplicaAssignment = newTopicMetadata.partitionsMetadata.map(p => p.replicas)
-        val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id).toList).toList
-        assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
-        for(i <- 0 until actualReplicaList.size) {
-          assertEquals(expectedReplicaAssignment(i), actualReplicaList(i))
-        }
-    }
-  }
-
-  @Test
   def testPartitionReassignmentWithLeaderInNewReplicas() {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
@@ -360,49 +326,41 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
   @Test
   def testShutdownBroker() {
-    info("inside testShutdownBroker")
     val expectedReplicaAssignment = Map(1  -> List(0, 1, 2))
     val topic = "test"
     val partition = 1
     // create brokers
     val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
+    val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
     // create the topic
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
-    val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
 
-    // broker 2 should be the leader since it was started first
-    var leaderBeforeShutdown = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get
     var controllerId = ZkUtils.getController(zkClient)
     var controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
     var partitionsRemaining = controller.shutdownBroker(2)
     try {
       assertEquals(0, partitionsRemaining)
-      var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+      var topicMetadata = controller.getTopicMetadata(Set(topic)).head
       var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-      assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
-      // assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
-      assertEquals(2, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+      assertEquals(0, leaderAfterShutdown)
+      assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
+      assertEquals(List(0,1), topicMetadata.partitionsMetadata.head.isr.map(_.id))
 
-      leaderBeforeShutdown = leaderAfterShutdown
       controllerId = ZkUtils.getController(zkClient)
       controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
       partitionsRemaining = controller.shutdownBroker(1)
       assertEquals(0, partitionsRemaining)
-      topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
+      topicMetadata = controller.getTopicMetadata(Set(topic)).head
       leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-      assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
-      // assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
-      assertEquals(1, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+      assertEquals(0, leaderAfterShutdown)
 
-      leaderBeforeShutdown = leaderAfterShutdown
       controllerId = ZkUtils.getController(zkClient)
       controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
       partitionsRemaining = controller.shutdownBroker(0)
       assertEquals(1, partitionsRemaining)
-      topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-      leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-      assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
-      assertEquals(1, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).isDefined
+      topicMetadata = controller.getTopicMetadata(Set(topic)).head
+      assertEquals(ErrorMapping.LeaderNotAvailableCode, topicMetadata.partitionsMetadata.head.errorCode)
     }
     finally {
       servers.foreach(_.shutdown())
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 4d989e4..90af26e 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -23,7 +23,6 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.server._
 import scala.collection._
 import org.scalatest.junit.JUnit3Suite
-import org.apache.log4j.{Level, Logger}
 import kafka.message._
 import kafka.serializer._
 import kafka.admin.CreateTopicCommand
@@ -31,6 +30,7 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.utils._
 import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
 import java.util.{Collections, Properties}
+import org.apache.log4j.{Logger, Level}
 import kafka.utils.TestUtils._
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
@@ -65,7 +65,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
   def testBasic() {
     val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
-    requestHandlerLogger.setLevel(Level.FATAL)
+//    requestHandlerLogger.setLevel(Level.FATAL)
 
     // test consumer timeout logic
     val consumerConfig0 = new ConsumerConfig(
@@ -142,7 +142,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
     // send some messages to each broker
-    val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++ 
+    val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++
                         sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
@@ -167,7 +167,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++ 
+    val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++
                         sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
@@ -240,7 +240,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
   def testCompressionSetConsumption() {
     // send some messages to each broker
-    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++ 
+    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++
                        sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec)
 
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
@@ -263,7 +263,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++ 
+    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++
                        sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec)
 
     val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@@ -294,39 +294,41 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
-  def testLeaderSelectionForPartition() {
-    val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer)
-
-    // create topic topic1 with 1 partition on broker 0
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
-
-    // send some messages to each broker
-    val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1)
-
-    // create a consumer
-    val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
-    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
-    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
-    val topicRegistry = zkConsumerConnector1.getTopicRegistry
-    assertEquals(1, topicRegistry.map(r => r._1).size)
-    assertEquals(topic, topicRegistry.map(r => r._1).head)
-    val topicsAndPartitionsInRegistry = topicRegistry.map(r => (r._1, r._2.map(p => p._2)))
-    val brokerPartition = topicsAndPartitionsInRegistry.head._2.head
-    assertEquals(0, brokerPartition.partitionId)
-
-    // also check partition ownership
-    val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
-    val expected_1 = List( ("0", "group1_consumer1-0"))
-    assertEquals(expected_1, actual_1)
-
-    val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
-    assertEquals(sentMessages1, receivedMessages1)
-  }
-
-  def sendMessagesToBrokerPartition(config: KafkaConfig, 
-                                    topic: String, 
-                                    partition: Int, 
-                                    numMessages: Int, 
+//  def testLeaderSelectionForPartition() {
+//    val zkClient = new ZkClient(zookeeperConnect, 6000, 30000, ZKStringSerializer)
+//
+//    // create topic topic1 with 1 partition on broker 0
+//    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+//
+//    // send some messages to each broker
+//    val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1)
+//
+//    // create a consumer
+//    val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+//    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+//    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
+//    val topicRegistry = zkConsumerConnector1.getTopicRegistry
+//    assertEquals(1, topicRegistry.map(r => r._1).size)
+//    assertEquals(topic, topicRegistry.map(r => r._1).head)
+//    val topicsAndPartitionsInRegistry = topicRegistry.map(r => (r._1, r._2.map(p => p._2)))
+//    val brokerPartition = topicsAndPartitionsInRegistry.head._2.head
+//    assertEquals(0, brokerPartition.partitionId)
+//
+//    // also check partition ownership
+//    val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
+//    val expected_1 = List( ("0", "group1_consumer1-0"))
+//    assertEquals(expected_1, actual_1)
+//
+//    val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
+//    assertEquals(sentMessages1, receivedMessages1)
+//    zkConsumerConnector1.shutdown()
+//    zkClient.close()
+//  }
+
+  def sendMessagesToBrokerPartition(config: KafkaConfig,
+                                    topic: String,
+                                    partition: Int,
+                                    numMessages: Int,
                                     compression: CompressionCodec = NoCompressionCodec): List[String] = {
     val header = "test-%d-%d".format(config.brokerId, partition)
     val props = new Properties()
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 2fc08d3..fb5ede2 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -27,15 +27,15 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
-import kafka.admin.{AdminUtils, CreateTopicCommand}
+import kafka.admin.CreateTopicCommand
 import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
-import kafka.utils.{TestUtils, Utils}
+import kafka.utils.{ZkUtils, TestUtils, Utils}
 
 /**
  * End to end tests of the primitive apis against a local server
  */
 class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness {
-  
+
   val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props)
@@ -300,8 +300,10 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
   def testConsumerEmptyTopic() {
     val newTopic = "new-topic"
     CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString)
+    var controllerId = ZkUtils.getController(zkClient)
+    var controller = servers.find(s => s.config.brokerId == controllerId).get.kafkaController
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
-      AdminUtils.fetchTopicMetadataFromZk(newTopic, zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+      controller.getTopicMetadata(immutable.Set(newTopic)).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500)
     val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
     assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 6db63ba..b1149c3 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -22,26 +22,27 @@ import kafka.zk.ZooKeeperTestHarness
 import kafka.admin.CreateTopicCommand
 import java.nio.ByteBuffer
 import junit.framework.Assert._
-import org.easymock.EasyMock
-import kafka.network._
 import kafka.cluster.Broker
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
-import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig}
+import kafka.server.{KafkaServer, KafkaConfig}
+import kafka.api.TopicMetadataRequest
 import kafka.common.ErrorMapping
-import kafka.api.{RequestKeys, TopicMetadata, TopicMetadataResponse, TopicMetadataRequest}
+import kafka.client.ClientUtils
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
   val configs = props.map(p => new KafkaConfig(p))
-  var brokers: Seq[Broker] = null
+  private var server1: KafkaServer = null
+  val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port))
 
   override def setUp() {
     super.setUp()
-    brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
+    server1 = TestUtils.createServer(configs.head)
   }
 
   override def tearDown() {
+    server1.shutdown()
     super.tearDown()
   }
 
@@ -65,77 +66,78 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
     // create topic
     val topic = "test"
     CreateTopicCommand.createTopic(zkClient, topic, 1)
-    // set up leader for topic partition 0
-    val leaderForPartitionMap = Map(
-      0 -> configs.head.brokerId
-    )
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-    val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
-    val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
-    assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
-    val partitionMetadata = topicMetadata.head.partitionsMetadata
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata",
+      2000,0).topicsMetadata
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
+    assertEquals("Expecting metadata for the test topic", "test", topicsMetadata.head.topic)
+    var partitionMetadata = topicsMetadata.head.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
     assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
     assertEquals(1, partitionMetadata.head.replicas.size)
+
+    // wait for leader to be elected
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+
+    // retry the metadata for the auto created topic
+    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata",
+      2000,0).topicsMetadata
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
   }
 
   def testGetAllTopicMetadata {
     // create topic
-    val topic = "test"
-    CreateTopicCommand.createTopic(zkClient, topic, 1)
-    // set up leader for topic partition 0
-    val leaderForPartitionMap = Map(
-      0 -> configs.head.brokerId
-    )
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-    val topicMetadataRequest = new TopicMetadataRequest(List(), 0)
-    val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
-    assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
-    val partitionMetadata = topicMetadata.head.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(1, partitionMetadata.head.replicas.size)
+    val topic1 = "testGetAllTopicMetadata1"
+    val topic2 = "testGetAllTopicMetadata2"
+    CreateTopicCommand.createTopic(zkClient, topic1, 1)
+    CreateTopicCommand.createTopic(zkClient, topic2, 1)
+
+    // wait for leader to be elected for both topics
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0, 1000)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 0, 1000)
+
+    // issue metadata request with empty list of topics
+    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokers, "TopicMetadataTest-testGetAllTopicMetadata",
+      2000, 0).topicsMetadata
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(2, topicsMetadata.size)
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    assertEquals(ErrorMapping.NoError, topicsMetadata.last.partitionsMetadata.head.errorCode)
+    val partitionMetadataTopic1 = topicsMetadata.head.partitionsMetadata
+    val partitionMetadataTopic2 = topicsMetadata.last.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic1.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic1.head.partitionId)
+    assertEquals(1, partitionMetadataTopic1.head.replicas.size)
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic2.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic2.head.partitionId)
+    assertEquals(1, partitionMetadataTopic2.head.replicas.size)
   }
 
   def testAutoCreateTopic {
     // auto create topic
-    val topic = "test"
-
-    val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
-    val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
-    assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
-    val partitionMetadata = topicMetadata.head.partitionsMetadata
+    val topic = "testAutoCreateTopic"
+
+    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testAutoCreateTopic",
+      2000,0).topicsMetadata
+    assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
+    assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
+    assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
+
+    // wait for leader to be elected
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+
+    // retry the metadata for the auto created topic
+    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata",
+      2000,0).topicsMetadata
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    var partitionMetadata = topicsMetadata.head.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
     assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(0, partitionMetadata.head.replicas.size)
-    assertEquals(None, partitionMetadata.head.leader)
-    assertEquals(ErrorMapping.LeaderNotAvailableCode, partitionMetadata.head.errorCode)
-  }
-
-  private def mockLogManagerAndTestTopic(request: TopicMetadataRequest): Seq[TopicMetadata] = {
-    // topic metadata request only requires 1 call from the replica manager
-    val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
-    EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
-    EasyMock.replay(replicaManager)
-
-
-    val serializedMetadataRequest = TestUtils.createRequestByteBuffer(request)
-
-    // create the kafka request handler
-    val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1)
-
-    // call the API (to be tested) to get metadata
-    apis.handleTopicMetadataRequest(new RequestChannel.Request
-      (processor=0, requestKey=RequestKeys.MetadataKey, buffer=serializedMetadataRequest, startTimeMs=1))
-    val metadataResponse = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
-    
-    // check assertions
-    val topicMetadata = TopicMetadataResponse.readFrom(metadataResponse).topicsMetadata
-
-    topicMetadata
+    assertEquals(1, partitionMetadata.head.replicas.size)
   }
 }
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 7f7a8d7..a70e2ac 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -260,10 +260,11 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          topicPartitionInfos = topicPartitionInfos)
     try {
       handler.partitionAndCollate(producerDataList)
-      fail("Should fail with UnknownTopicOrPartitionException")
     }
     catch {
-      case e: UnknownTopicOrPartitionException => // expected, do nothing
+      // should not throw UnknownTopicOrPartitionException to allow resend
+      case e: UnknownTopicOrPartitionException => fail("Should fail with UnknownTopicOrPartitionException")
+
     }
   }
 
@@ -291,10 +292,10 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          topicPartitionInfos = topicPartitionInfos)
     try {
       handler.handle(producerDataList)
-      fail("Should fail with NoBrokersForPartitionException")
+      fail("Should fail with FailedToSendMessageException")
     }
     catch {
-      case e: NoBrokersForPartitionException => // expected, do nothing
+      case e: FailedToSendMessageException => // we retry on any exception now
     }
   }
 
@@ -418,6 +419,8 @@ class AsyncProducerTest extends JUnit3Suite {
     val response2 = ProducerResponse(0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
+    // don't care about config mock
+    EasyMock.expect(mockSyncProducer.config).andReturn(EasyMock.anyObject()).anyTimes()
     EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
     EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response1)
     EasyMock.expect(mockSyncProducer.send(request3)).andReturn(response2)
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index bc37531..fe3beaa 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -27,10 +27,10 @@ import org.junit.Assert._
 import org.junit.Test
 import kafka.utils._
 import java.util
-import kafka.admin.{AdminUtils, CreateTopicCommand}
+import kafka.admin.CreateTopicCommand
 import util.Properties
 import kafka.api.FetchRequestBuilder
-import kafka.common.{KafkaException, ErrorMapping, FailedToSendMessageException}
+import kafka.common.{ErrorMapping, FailedToSendMessageException}
 
 
 class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
@@ -43,6 +43,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
   private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
+  private var servers = List.empty[KafkaServer]
 
   private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
   private val config1 = new KafkaConfig(props1) {
@@ -60,6 +61,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     // set up 2 brokers with 4 partitions each
     server1 = TestUtils.createServer(config1)
     server2 = TestUtils.createServer(config2)
+    servers = List(server1,server2)
 
     val props = new Properties()
     props.put("host", "localhost")
@@ -68,7 +70,6 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024, "")
     consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024, "")
 
-
     // temporarily set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.FATAL)
   }
@@ -88,8 +89,12 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
 
   def testUpdateBrokerPartitionInfo() {
     CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
+    // get topic metadata from controller
+    var controllerId = ZkUtils.getController(zkClient)
+    var controller = servers.find(s => s.config.brokerId == controllerId).get.kafkaController
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
-      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+      controller.getTopicMetadata(Set("new-topic")).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode,
+      zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     val props1 = new util.Properties()
@@ -101,7 +106,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
       producer1.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
       fail("Test should fail because the broker list provided are not valid")
     } catch {
-      case e: KafkaException =>
+      case e: FailedToSendMessageException =>
       case oe => fail("fails with exception", oe)
     } finally {
       producer1.close()
@@ -153,8 +158,11 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
 
     // create topic with 1 partition and await leadership
     CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
+    var controllerId = ZkUtils.getController(zkClient)
+    var controller = servers.find(s => s.config.brokerId == controllerId).get.kafkaController
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
-      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+      controller.getTopicMetadata(Set("new-topic")).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode,
+      zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     val producer1 = new Producer[String, String](producerConfig1)
@@ -204,8 +212,11 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
 
     // create topic
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
+    var controllerId = ZkUtils.getController(zkClient)
+    var controller = servers.find(s => s.config.brokerId == controllerId).get.kafkaController
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
-      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+      controller.getTopicMetadata(Set("new-topic")).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode,
+      zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 1, 500)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 2, 500)
@@ -259,14 +270,18 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     props.put("request.timeout.ms", String.valueOf(timeoutMs))
     props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
     props.put("request.required.acks", "1")
-
+    props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout")
+    props.put("message.send.max.retries","1")
     val config = new ProducerConfig(props)
     val producer = new Producer[String, String](config)
 
     // create topics in ZK
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
+    var controllerId = ZkUtils.getController(zkClient)
+    var controller = servers.find(s => s.config.brokerId == controllerId).get.kafkaController
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
-      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+      controller.getTopicMetadata(Set("new-topic")).head.errorCode != ErrorMapping.UnknownTopicOrPartitionCode,
+      zookeeper.tickTime))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     // do a simple test to make sure plumbing is okay
@@ -302,6 +317,9 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     // make sure we don't wait fewer than numRetries*timeoutMs milliseconds
     // we do this because the DefaultEventHandler retries a number of times
     assertTrue((t2-t1) >= timeoutMs*config.messageSendMaxRetries)
+    // restart the request handler pool on server 1 so that it can process the outstanding metadata requests. If this
+    // is not done, server 2 can never shut down since it is waiting for some reply from server 1
+    server1.requestHandlerPool.startup()
   }
 }
 
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 1557047..11afd30 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -20,7 +20,7 @@ import kafka.cluster.{Partition, Replica}
 import kafka.log.Log
 import kafka.message.{ByteBufferMessageSet, Message}
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
-import kafka.utils.{Time, TestUtils, MockTime}
+import kafka.utils.{ZkUtils, Time, TestUtils, MockTime}
 import org.easymock.EasyMock
 import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
@@ -28,6 +28,7 @@ import kafka.api._
 import scala.Some
 import org.junit.Assert._
 import kafka.common.TopicAndPartition
+import kafka.controller.KafkaController
 
 
 class SimpleFetchTest extends JUnit3Suite {
@@ -57,7 +58,9 @@ class SimpleFetchTest extends JUnit3Suite {
     val fetchSize = 100
     val messages = new Message("test-message".getBytes())
 
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    // create nice mock since we don't particularly care about zkclient calls
+    val zkClient = EasyMock.createNiceMock(classOf[ZkClient])
+    EasyMock.expect(zkClient.exists(ZkUtils.ControllerEpochPath)).andReturn(false)
     EasyMock.replay(zkClient)
 
     val log = EasyMock.createMock(classOf[kafka.log.Log])
@@ -88,7 +91,8 @@ class SimpleFetchTest extends JUnit3Suite {
     // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary)
     // don't provide replica or leader callbacks since they will not be tested here
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId)
+    val controller = new KafkaController(configs.head, zkClient)
+    val apis = new KafkaApis(controller, requestChannel, replicaManager, zkClient, configs.head.brokerId)
 
     // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log
     val goodFetch = new FetchRequestBuilder()
@@ -151,7 +155,8 @@ class SimpleFetchTest extends JUnit3Suite {
     val followerReplicaId = configs(1).brokerId
     val followerLEO = 15
 
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    val zkClient = EasyMock.createNiceMock(classOf[ZkClient])
+    EasyMock.expect(zkClient.exists(ZkUtils.ControllerEpochPath)).andReturn(false)
     EasyMock.replay(zkClient)
 
     val log = EasyMock.createMock(classOf[kafka.log.Log])
@@ -182,7 +187,8 @@ class SimpleFetchTest extends JUnit3Suite {
     EasyMock.replay(replicaManager)
 
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId)
+    val controller = new KafkaController(configs.head, zkClient)
+    val apis = new KafkaApis(controller, requestChannel, replicaManager, zkClient, configs.head.brokerId)
 
     /**
      * This fetch, coming from a replica, requests all data at offset "15".  Because the request is coming
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f9c9e64..acd1d63 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -130,6 +130,7 @@ object TestUtils extends Logging {
     props.put("log.flush.interval.messages", "1")
     props.put("zookeeper.connect", TestZKUtils.zookeeperConnect)
     props.put("replica.socket.timeout.ms", "1500")
+    props.put("metadata.request.timeout.ms", "5000")
     props
   }
 
