diff --git a/config/log4j.properties b/config/log4j.properties
index b76bc94..586afae 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -48,7 +48,7 @@ log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 #log4j.logger.kafka.perf=DEBUG, kafkaAppender
 #log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
 #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
-log4j.logger.kafka=INFO, kafkaAppender
+log4j.logger.kafka=INFO, stdout 
 
 log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender
 log4j.additivity.kafka.network.RequestChannel$=false
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index c7652ad..41cb764 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -18,16 +18,12 @@
 package kafka.admin
 
 import java.util.Random
-import kafka.api.{TopicMetadata, PartitionMetadata}
-import kafka.cluster.Broker
 import kafka.utils.{Logging, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection._
-import mutable.ListBuffer
 import scala.collection.mutable
 import kafka.common._
-import scala.Some
 
 object AdminUtils extends Logging {
   val rand = new Random
@@ -89,87 +85,6 @@ object AdminUtils extends Logging {
     }
   }
 
-  def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
-    fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])
-
-  def fetchTopicMetadataFromZk(topics: Set[String], zkClient: ZkClient): Set[TopicMetadata] = {
-    val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
-    topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo))
-  }
-
-  private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker]): TopicMetadata = {
-    if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
-      val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get
-      val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
-      val partitionMetadata = sortedPartitions.map { partitionMap =>
-        val partition = partitionMap._1
-        val replicas = partitionMap._2
-        val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
-        val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
-        debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
-
-        var leaderInfo: Option[Broker] = None
-        var replicaInfo: Seq[Broker] = Nil
-        var isrInfo: Seq[Broker] = Nil
-        try {
-          leaderInfo = leader match {
-            case Some(l) =>
-              try {
-                Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
-              } catch {
-                case e => throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition), e)
-              }
-            case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
-          }
-          try {
-            replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
-            isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
-          } catch {
-            case e => throw new ReplicaNotAvailableException(e)
-          }
-          if(replicaInfo.size < replicas.size)
-            throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
-              replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
-          if(isrInfo.size < inSyncReplicas.size)
-            throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
-              inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
-          new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
-        } catch {
-          case e =>
-            debug("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
-            new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
-              ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-        }
-      }
-      new TopicMetadata(topic, partitionMetadata)
-    } else {
-      // topic doesn't exist, send appropriate error code
-      new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
-    }
-  }
-
-  private def getBrokerInfoFromCache(zkClient: ZkClient,
-                                     cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker],
-                                     brokerIds: Seq[Int]): Seq[Broker] = {
-    var failedBrokerIds: ListBuffer[Int] = new ListBuffer()
-    val brokerMetadata = brokerIds.map { id =>
-      val optionalBrokerInfo = cachedBrokerInfo.get(id)
-      optionalBrokerInfo match {
-        case Some(brokerInfo) => Some(brokerInfo) // return broker info from the cache
-        case None => // fetch it from zookeeper
-          ZkUtils.getBrokerInfo(zkClient, id) match {
-            case Some(brokerInfo) =>
-              cachedBrokerInfo += (id -> brokerInfo)
-              Some(brokerInfo)
-            case None =>
-              failedBrokerIds += id
-              None
-          }
-      }
-    }
-    brokerMetadata.filter(_.isDefined).map(_.get)
-  }
-
   private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
     val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
     (firstReplicaIndex + shift) % nBrokers
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 68e64d6..63d3683 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -54,12 +54,15 @@ object PartitionStateInfo {
     val isr = Utils.parseCsvList(isrString).map(_.toInt).toList
     val zkVersion = buffer.getInt
     val replicationFactor = buffer.getInt
+    val replicas = for(i <- 0 until replicationFactor) yield buffer.getInt
     PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr, zkVersion), controllerEpoch),
-      replicationFactor)
+      replicationFactor, replicas.toSet)
   }
 }
 
-case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, val replicationFactor: Int) {
+case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                              val replicationFactor: Int,
+                              val allReplicas: Set[Int]) {
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(leaderIsrAndControllerEpoch.controllerEpoch)
     buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leader)
@@ -67,6 +70,7 @@ case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndContr
     writeShortString(buffer, leaderIsrAndControllerEpoch.leaderAndIsr.isr.mkString(","))
     buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion)
     buffer.putInt(replicationFactor)
+    allReplicas.foreach(buffer.putInt(_))
   }
 
   def sizeInBytes(): Int = {
@@ -76,7 +80,8 @@ case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndContr
       4 /* leader epoch */ +
       (2 + leaderIsrAndControllerEpoch.leaderAndIsr.isr.mkString(",").length) +
       4 /* zk version */ +
-      4 /* replication factor */
+      4 /* replication factor */ +
+      allReplicas.size * 4
     size
   }
   
@@ -84,6 +89,7 @@ case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndContr
     val partitionStateInfo = new StringBuilder
     partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
     partitionStateInfo.append(",ReplicationFactor:" + replicationFactor + ")")
+    partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")")
     partitionStateInfo.toString()
   }
 }
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
index b4cfae8..378b2b3 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
@@ -63,7 +63,7 @@ case class LeaderAndIsrResponse(override val correlationId: Int,
     buffer.putInt(correlationId)
     buffer.putShort(errorCode)
     buffer.putInt(responseMap.size)
-    for ((key:(String, Int), value) <- responseMap){
+    for ((key:(String, Int), value) <- responseMap) {
       writeShortString(buffer, key._1)
       buffer.putInt(key._2)
       buffer.putShort(value)
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index b000eb7..541cf84 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -27,6 +27,7 @@ object RequestKeys {
   val MetadataKey: Short = 3
   val LeaderAndIsrKey: Short = 4
   val StopReplicaKey: Short = 5
+  val UpdateMetadataKey: Short = 6
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -34,7 +35,8 @@ object RequestKeys {
         OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
         MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
         LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
-        StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom))
+        StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom),
+        UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom))
 
   def nameForKey(key: Short): String = {
     keyToNameAndDeserializerMap.get(key) match {
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
new file mode 100644
index 0000000..28e057c
--- /dev/null
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -0,0 +1,112 @@
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.api.ApiUtils._
+import kafka.cluster.Broker
+import kafka.common.TopicAndPartition
+
+object UpdateMetadataRequest {
+  val CurrentVersion = 0.shortValue
+  val IsInit: Boolean = true
+  val NotInit: Boolean = false
+  val DefaultAckTimeout: Int = 1000
+
+  def readFrom(buffer: ByteBuffer): UpdateMetadataRequest = {
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = readShortString(buffer)
+    val ackTimeoutMs = buffer.getInt
+    val controllerId = buffer.getInt
+    val controllerEpoch = buffer.getInt
+    val partitionStateInfosCount = buffer.getInt
+    val partitionStateInfos = new collection.mutable.HashMap[TopicAndPartition, PartitionStateInfo]
+
+    for(i <- 0 until partitionStateInfosCount){
+      val topic = readShortString(buffer)
+      val partition = buffer.getInt
+      val partitionStateInfo = PartitionStateInfo.readFrom(buffer)
+
+      partitionStateInfos.put(TopicAndPartition(topic, partition), partitionStateInfo)
+    }
+
+    val numAliveBrokers = buffer.getInt
+    val aliveBrokers = for(i <- 0 until numAliveBrokers) yield buffer.getInt
+
+    val numBrokers = buffer.getInt
+    val allBrokers = for(i <- 0 until numBrokers) yield Broker.readFrom(buffer)
+    new UpdateMetadataRequest(versionId, correlationId, clientId, ackTimeoutMs, controllerId, controllerEpoch,
+      partitionStateInfos.toMap, aliveBrokers.toSet, allBrokers.toSet)
+  }
+}
+
+case class UpdateMetadataRequest (versionId: Short,
+                                  override val correlationId: Int,
+                                  clientId: String,
+                                  ackTimeoutMs: Int,
+                                  controllerId: Int,
+                                  controllerEpoch: Int,
+                                  partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo],
+                                  aliveBrokers: Set[Int],
+                                  allBrokers: Set[Broker])
+  extends RequestOrResponse(Some(RequestKeys.UpdateMetadataKey), correlationId) {
+
+  def this(controllerId: Int, controllerEpoch: Int, correlationId: Int, clientId: String,
+           partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo], aliveBrokers: Set[Int], allBrokers: Set[Broker]) = {
+    this(UpdateMetadataRequest.CurrentVersion, correlationId, clientId, UpdateMetadataRequest.DefaultAckTimeout,
+      controllerId, controllerEpoch, partitionStateInfos, aliveBrokers, allBrokers)
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    writeShortString(buffer, clientId)
+    buffer.putInt(ackTimeoutMs)
+    buffer.putInt(controllerId)
+    buffer.putInt(controllerEpoch)
+    buffer.putInt(partitionStateInfos.size)
+    for((key, value) <- partitionStateInfos){
+      writeShortString(buffer, key.topic)
+      buffer.putInt(key.partition)
+      value.writeTo(buffer)
+    }
+    buffer.putInt(aliveBrokers.size)
+    aliveBrokers.foreach(buffer.putInt(_))
+    buffer.putInt(allBrokers.size)
+    allBrokers.foreach(_.writeTo(buffer))
+  }
+
+  def sizeInBytes(): Int = {
+    var size =
+      2 /* version id */ +
+        4 /* correlation id */ +
+        (2 + clientId.length) /* client id */ +
+        4 /* ack timeout */ +
+        4 /* controller id */ +
+        4 /* controller epoch */ +
+        4 /* number of partitions */
+    for((key, value) <- partitionStateInfos)
+      size += (2 + key.topic.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */
+    size += 4 /* number of alive brokers in the cluster */
+    aliveBrokers.foreach(b => size += 4) /* broker id */
+    size += 4 /* number of brokers in the cluster */
+    for(broker <- allBrokers)
+      size += broker.sizeInBytes /* broker info */
+    size
+  }
+
+  override def toString(): String = {
+    val updateMetadataRequest = new StringBuilder
+    updateMetadataRequest.append("Name:" + this.getClass.getSimpleName)
+    updateMetadataRequest.append(";Version:" + versionId)
+    updateMetadataRequest.append(";Controller:" + controllerId)
+    updateMetadataRequest.append(";ControllerEpoch:" + controllerEpoch)
+    updateMetadataRequest.append(";CorrelationId:" + correlationId)
+    updateMetadataRequest.append(";ClientId:" + clientId)
+    updateMetadataRequest.append(";AckTimeoutMs:" + ackTimeoutMs + " ms")
+    updateMetadataRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
+    updateMetadataRequest.append(";AliveBrokers:" + aliveBrokers.mkString(","))
+    updateMetadataRequest.append(";Brokers:" + allBrokers.mkString(","))
+    updateMetadataRequest.toString()
+  }
+
+}
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
new file mode 100644
index 0000000..31e924b
--- /dev/null
+++ b/core/src/main/scala/kafka/api/UpdateMetadataResponse.scala
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import kafka.common.{TopicAndPartition, ErrorMapping}
+import java.nio.ByteBuffer
+import kafka.api.ApiUtils._
+import collection.mutable.HashMap
+import collection.Map
+
+
+object UpdateMetadataResponse {
+  def readFrom(buffer: ByteBuffer): UpdateMetadataResponse = {
+    val correlationId = buffer.getInt
+    val errorCode = buffer.getShort
+    val numEntries = buffer.getInt
+    val responseMap = new HashMap[TopicAndPartition, Short]()
+    for (i<- 0 until numEntries){
+      val topic = readShortString(buffer)
+      val partition = buffer.getInt
+      val partitionErrorCode = buffer.getShort
+      responseMap.put(TopicAndPartition(topic, partition), partitionErrorCode)
+    }
+    new UpdateMetadataResponse(correlationId, responseMap, errorCode)
+  }
+}
+
+
+case class UpdateMetadataResponse(override val correlationId: Int,
+                                  responseMap: Map[TopicAndPartition, Short],
+                                  errorCode: Short = ErrorMapping.NoError)
+  extends RequestOrResponse(correlationId = correlationId) {
+  def sizeInBytes(): Int ={
+    var size =
+      4 /* correlation id */ +
+        2 /* error code */ +
+        4 /* number of responses */
+    for ((key, value) <- responseMap) {
+      size +=
+        2 + key.topic.length /* topic */ +
+          4 /* partition */ +
+          2 /* error code for this partition */
+    }
+    size
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(correlationId)
+    buffer.putShort(errorCode)
+    buffer.putInt(responseMap.size)
+    for ((key, value) <- responseMap) {
+      writeShortString(buffer, key.topic)
+      buffer.putInt(key.partition)
+      buffer.putShort(value)
+    }
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 025d3ab..ebe4845 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -47,7 +47,7 @@ object ClientUtils extends Logging{
         producer.close()
       }
     }
-    if(!fetchMetaDataSucceeded){
+    if(!fetchMetaDataSucceeded) {
       throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t)
     } else {
       debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
@@ -62,13 +62,14 @@ object ClientUtils extends Logging{
    * @param clientId The client's identifier
    * @return topic metadata response
    */
-  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int): TopicMetadataResponse = {
+  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int,
+                         correlationId: Int = 0): TopicMetadataResponse = {
     val props = new Properties()
     props.put("metadata.broker.list", brokers.map(_.getConnectionString()).mkString(","))
     props.put("client.id", clientId)
     props.put("request.timeout.ms", timeoutMs.toString)
     val producerConfig = new ProducerConfig(props)
-    fetchTopicMetadata(topics, brokers, producerConfig, 0)
+    fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index c6250dc..a46d737 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -26,8 +26,9 @@ import scala.collection.mutable
 import java.util.concurrent.locks.ReentrantLock
 import kafka.utils.ZkUtils._
 import kafka.utils.{ShutdownableThread, SystemTime}
-import kafka.common.TopicAndPartition
+import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.client.ClientUtils
+import java.util.concurrent.atomic.AtomicInteger
 
 /**
  *  Usage:
@@ -44,6 +45,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
   private val lock = new ReentrantLock
   private val cond = lock.newCondition()
   private var leaderFinderThread: ShutdownableThread = null
+  private val correlationId = new AtomicInteger(0)
 
   private class LeaderFinderThread(name: String) extends ShutdownableThread(name) {
     // thread responsible for adding the fetcher to the right broker when leader is available
@@ -61,22 +63,44 @@ class ConsumerFetcherManager(private val consumerIdString: String,
           val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
                                                               brokers,
                                                               config.clientId,
-                                                              config.socketTimeoutMs).topicsMetadata
-          val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
-          topicsMetadata.foreach(
-            tmd => {
-              val topic = tmd.topic
-              tmd.partitionsMetadata.foreach(
-              pmd => {
-                val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
-                if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) {
-                  val leaderBroker = pmd.leader.get
-                  leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
+                                                              config.socketTimeoutMs,
+                                                              correlationId.getAndIncrement).topicsMetadata
+          topicsMetadata.foreach { topicMetadata =>
+            topicMetadata.errorCode match {
+              case ErrorMapping.NoError =>
+                topicMetadata.partitionsMetadata.foreach { partitionMetadata =>
+                  partitionMetadata.errorCode match {
+                    case ErrorMapping.NoError =>
+                      debug("Leader for partition [%s,%d] is %d".format(topicMetadata.topic,
+                        partitionMetadata.partitionId, partitionMetadata.leader.get.id))
+                    case ErrorMapping.ReplicaNotAvailableCode =>
+                      // this error message means some replica other than the leader is not available. The consumer
+                      // doesn't care about non leader replicas, so ignore this
+                      debug("Leader for partition [%s,%d] is %d".format(topicMetadata.topic,
+                        partitionMetadata.partitionId, partitionMetadata.leader.get.id))
+                    case _ =>
+                      debug("Leader for partition [%s,%d] is not available due to %s".format(topicMetadata.topic,
+                        partitionMetadata.partitionId, ErrorMapping.exceptionFor(partitionMetadata.errorCode).getClass.getName))
+                  }
                 }
-              })
-            })
+              case _ =>
+                debug("No partition metadata for topic %s due to %s".format(topicMetadata.topic,
+                  ErrorMapping.exceptionFor(topicMetadata.errorCode).getClass.getName))
+            }
+          }
+          val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
+          topicsMetadata.foreach { tmd =>
+            val topic = tmd.topic
+            tmd.partitionsMetadata.foreach { pmd =>
+              val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
+              if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) {
+                val leaderBroker = pmd.leader.get
+                leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
+              }
+            }
+          }
 
-          leaderForPartitionsMap.foreach{
+          leaderForPartitionsMap.foreach {
             case(topicAndPartition, leaderBroker) =>
               val pti = partitionMap(topicAndPartition)
               try {
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 5f9c902..1270e92 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -20,9 +20,8 @@ package kafka.consumer
 import kafka.cluster.Broker
 import kafka.server.AbstractFetcherThread
 import kafka.message.ByteBufferMessageSet
-import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, FetchResponsePartitionData}
+import kafka.api.{Request, OffsetRequest, FetchResponsePartitionData}
 import kafka.common.TopicAndPartition
-import kafka.common.ErrorMapping
 
 
 class ConsumerFetcherThread(name: String,
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 1fbdfc3..bdeee91 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -20,11 +20,7 @@ package kafka.consumer
 import kafka.api._
 import kafka.network._
 import kafka.utils._
-import kafka.utils.ZkUtils._
-import collection.immutable
-import kafka.common.{ErrorMapping, TopicAndPartition, KafkaException}
-import org.I0Itec.zkclient.ZkClient
-import kafka.cluster.Broker
+import kafka.common.{ErrorMapping, TopicAndPartition}
 
 /**
  * A consumer of kafka messages
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 398618f..e66680b 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -303,6 +303,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   class ZKRebalancerListener(val group: String, val consumerIdString: String,
                              val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
     extends IZkChildListener {
+    private val correlationId = new AtomicInteger(0)
     private var isWatcherTriggered = false
     private val lock = new ReentrantLock
     private val cond = lock.newCondition()
@@ -403,9 +404,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet,
                                                           brokers,
                                                           config.clientId,
-                                                          config.socketTimeoutMs).topicsMetadata
+                                                          config.socketTimeoutMs,
+                                                          correlationId.getAndIncrement).topicsMetadata
       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
-      topicsMetadata.foreach(m =>{
+      topicsMetadata.foreach(m => {
         val topic = m.topic
         val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
         partitionsPerTopicMap.put(topic, partitions)
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 3164f78..3562335 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -25,6 +25,7 @@ import kafka.server.KafkaConfig
 import collection.mutable
 import kafka.api._
 import org.apache.log4j.Logger
+import kafka.common.TopicAndPartition
 
 class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging {
   private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
@@ -75,6 +76,7 @@ class ControllerChannelManager (private val controllerContext: ControllerContext
 
   private def addNewBroker(broker: Broker) {
     val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize)
+    debug("Controller %d trying to connect to broker %d".format(config.brokerId,broker.id))
     val channel = new BlockingChannel(broker.host, broker.port,
       BlockingChannel.UseDefaultBufferSize,
       BlockingChannel.UseDefaultBufferSize,
@@ -150,6 +152,9 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
   val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
   val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
   val stopAndDeleteReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
+  val updateMetadataRequestMap = new mutable.HashMap[Int, mutable.HashMap[TopicAndPartition, PartitionStateInfo]]
+  var aliveBrokers: mutable.Set[Int] = new mutable.HashSet[Int]()
+  var allBrokers: mutable.Set[Broker] = new mutable.HashSet[Broker]()
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
   def newBatch() {
@@ -159,14 +164,18 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
         "a new one. Some state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
     leaderAndIsrRequestMap.clear()
     stopReplicaRequestMap.clear()
+    updateMetadataRequestMap.clear()
     stopAndDeleteReplicaRequestMap.clear()
   }
 
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
-                                       leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, replicationFactor: Int) {
+                                       leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                                       replicas: Seq[Int]) {
+    val replicationFactor = replicas.size
     brokerIds.foreach { brokerId =>
       leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo])
-      leaderAndIsrRequestMap(brokerId).put((topic, partition), PartitionStateInfo(leaderIsrAndControllerEpoch, replicationFactor))
+      leaderAndIsrRequestMap(brokerId).put((topic, partition),
+        PartitionStateInfo(leaderIsrAndControllerEpoch, replicationFactor, replicas.toSet))
     }
   }
 
@@ -185,6 +194,32 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
     }
   }
 
+  def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int], controllerContext: ControllerContext,
+                                         partitions:Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
+    val partitionList =
+    if(partitions.isEmpty) {
+      controllerContext.partitionLeadershipInfo.keySet
+    } else {
+      partitions
+    }
+    partitionList.foreach { partition =>
+      val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
+      leaderIsrAndControllerEpochOpt match {
+        case Some(leaderIsrAndControllerEpoch) =>
+          val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
+          val partitionStateInfo = PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.size, replicas)
+          brokerIds.foreach { brokerId =>
+            updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo])
+            updateMetadataRequestMap(brokerId).put(partition, partitionStateInfo)
+          }
+        case None =>
+          info("Leader not assigned yet for partition %s. Skip sending udpate metadata request".format(partition))
+      }
+    }
+    aliveBrokers = new mutable.HashSet[Int] ++ controllerContext.liveBrokerIds
+    allBrokers = new mutable.HashSet[Broker] ++ controllerContext.allBrokers.values.toSet
+  }
+
   def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int, liveBrokers: Set[Broker]) {
     leaderAndIsrRequestMap.foreach { m =>
       val broker = m._1
@@ -202,6 +237,16 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
       sendRequest(broker, leaderAndIsrRequest, null)
     }
     leaderAndIsrRequestMap.clear()
+    updateMetadataRequestMap.foreach { m =>
+      val broker = m._1
+      val partitionStateInfos = m._2.toMap
+      val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, correlationId, clientId,
+                                                            partitionStateInfos, aliveBrokers.toSet, allBrokers.toSet)
+      partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " +
+        "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, updateMetadataRequest, correlationId, broker, p._1)))
+      sendRequest(broker, updateMetadataRequest, null)
+    }
+    updateMetadataRequestMap.clear()
     Seq((stopReplicaRequestMap, false), (stopAndDeleteReplicaRequestMap, true)) foreach {
       case(m, deletePartitions) => {
         m foreach {
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 02510bd..85bb8fe 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -32,13 +32,14 @@ import kafka.utils.{Utils, ZkUtils, Logging}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
+import java.util.concurrent.atomic.AtomicInteger
 import scala.Some
 import kafka.common.TopicAndPartition
-import java.util.concurrent.atomic.AtomicInteger
 
 class ControllerContext(val zkClient: ZkClient,
                         var controllerChannelManager: ControllerChannelManager = null,
                         val controllerLock: Object = new Object,
+                        val allBrokers: mutable.Map[Int, Broker] = mutable.Map.empty,
                         var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
                         val brokerShutdownLock: Object = new Object,
                         var epoch: Int = KafkaController.InitialControllerEpoch - 1,
@@ -161,7 +162,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       val partitionsToMove = replicatedPartitionsBrokerLeads().toSet
       debug("Partitions to move leadership from broker %d: %s".format(id, partitionsToMove.mkString(",")))
 
-      partitionsToMove.foreach{ topicAndPartition =>
+      partitionsToMove.foreach { topicAndPartition =>
         val (topic, partition) = topicAndPartition.asTuple
         // move leadership serially to relinquish lock.
         controllerContext.controllerLock synchronized {
@@ -200,7 +201,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
                 case Some(updatedLeaderIsrAndControllerEpoch) =>
                   brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
                     Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition,
-                    updatedLeaderIsrAndControllerEpoch, replicationFactor)
+                    updatedLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(topicAndPartition))
                 case None =>
                 // ignore
               }
@@ -209,6 +210,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
       }
 
+      // we could just send the metadata for partitions to be moved, but it's safer and not very expensive to send
+      // metadata for all partitions
+      sendUpdateMetadataRequest(controllerContext.allBrokers.keySet.toSeq)
+
       debug("Remaining partitions to move from broker %d: %s".format(id, partitionsRemaining.mkString(",")))
       partitionsRemaining.size
     }
@@ -244,6 +249,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
       initializeAndMaybeTriggerPartitionReassignment()
       initializeAndMaybeTriggerPreferredReplicaElection()
+      /* send partition leadership info to all live brokers */
+      sendUpdateMetadataRequest(controllerContext.liveBrokerIds.toSeq)
     }
     else
       info("Controller has been shut down, aborting startup/failover")
@@ -281,13 +288,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
     // to see if these brokers can become leaders for some/all of those
     partitionStateMachine.triggerOnlinePartitionStateChange()
-
     // check if reassignment of some partitions need to be restarted
     val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter{
       case (topicAndPartition, reassignmentContext) =>
         reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
     }
     partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
+    // send update metadata request for all partitions to the newly restarted brokers
+    sendUpdateMetadataRequest(newBrokers)
   }
 
   /**
@@ -317,6 +325,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     partitionStateMachine.triggerOnlinePartitionStateChange()
     // handle dead replicas
     replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), OfflineReplica)
+    // send update metadata request since leaders have changed
+    sendUpdateMetadataRequest(controllerContext.liveBrokerIds.toSeq, partitionsWithoutLeader.toSet)
   }
 
   /**
@@ -344,6 +354,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), NewReplica)
     partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
     replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), OnlineReplica)
+    /* send new partition leadership info to all live brokers */
+    sendUpdateMetadataRequest(controllerContext.liveBrokerIds.toSeq, newPartitions)
   }
 
   /**
@@ -379,6 +391,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         removePartitionFromReassignedPartitions(topicAndPartition)
         info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
         controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
+        sendUpdateMetadataRequest(controllerContext.liveBrokerIds.toSeq, Set(topicAndPartition))
       case false =>
         info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
           "reassigned not yet caught up with the leader")
@@ -397,6 +410,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     } catch {
       case e => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
     } finally {
+      // send partition leadership information to all brokers
+      sendUpdateMetadataRequest(controllerContext.liveBrokerIds.toSeq, partitions)
       removePartitionsFromPreferredReplicaElection(partitions)
     }
   }
@@ -475,6 +490,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
 
   private def initializeControllerContext() {
     controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
+    controllerContext.allBrokers ++= controllerContext.liveBrokers.map(b => (b.id -> b))
     controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
     controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
     controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
@@ -662,6 +678,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     }.flatten
   }
 
+  private def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
+    // send the newly restarted brokers, the leaders for all partitions so that they can correctly respond to metadata requests
+    brokerRequestBatch.newBatch()
+    brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, controllerContext, partitions)
+    brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
+  }
+
   /**
    * Removes a given partition replica from the ISR; if it is not the current
    * leader and there are sufficient remaining replicas in ISR.
@@ -673,7 +696,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    */
   def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderIsrAndControllerEpoch] = {
     val topicAndPartition = TopicAndPartition(topic, partition)
-    debug("Removing replica %d from ISR of %s.".format(replicaId, topicAndPartition))
+    debug("Removing replica %d from ISR %s for partition %s.".format(replicaId,
+      controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr, topicAndPartition))
     var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
     var zkWriteCompleteOrUnnecessary = false
     while (!zkWriteCompleteOrUnnecessary) {
@@ -701,6 +725,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
             newLeaderAndIsr.zkVersion = newVersion
 
             finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch))
+            controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
             if (updateSucceeded)
               info("New leader and ISR for partition %s is %s".format(topicAndPartition, newLeaderAndIsr.toString()))
             updateSucceeded
@@ -708,6 +733,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
             warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s"
                  .format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr))
             finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch))
+            controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
             true
           }
         case None =>
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index c017727..ac350e9 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -233,7 +233,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * @param topicAndPartition   The topic/partition whose leader and isr path is to be initialized
    */
   private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
-    debug("Initializing leader and isr for partition %s".format(topicAndPartition))
     val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
     val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
     liveAssignedReplicas.size match {
@@ -249,6 +248,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         val leader = liveAssignedReplicas.head
         val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),
           controller.epoch)
+        debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch))
         try {
           ZkUtils.createPersistentPath(controllerContext.zkClient,
             ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
@@ -257,7 +257,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           // took over and initialized this partition. This can happen if the current controller went into a long
           // GC pause
           brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
-            topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment.size)
+            topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)
           controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
         } catch {
           case e: ZkNodeExistsException =>
@@ -316,9 +316,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
       stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s"
                                 .format(controllerId, controller.epoch, newLeaderAndIsr.leader, topicAndPartition))
+      val replicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
       // store new leader and isr info in cache
       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
-        newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
+        newLeaderIsrAndControllerEpoch, replicas)
     } catch {
       case lenne: LeaderElectionNotNeededException => // swallow
       case nroe: NoReplicaOnlineException => throw nroe
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index bea1644..00fd62c 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -121,7 +121,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                 throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
                   .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
               brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
-                                                                  topic, partition, leaderIsrAndControllerEpoch, replicaAssignment.size)
+                                                                  topic, partition, leaderIsrAndControllerEpoch,
+                                                                  replicaAssignment)
             case None => // new leader request will be sent to this replica when one gets elected
           }
           replicaState.put((topic, partition, replicaId), NewReplica)
@@ -152,7 +153,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
               controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
                 case Some(leaderIsrAndControllerEpoch) =>
                   brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
-                    replicaAssignment.size)
+                    replicaAssignment)
                   replicaState.put((topic, partition, replicaId), OnlineReplica)
                   stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica"
                     .format(controllerId, controller.epoch, replicaId, topicAndPartition))
@@ -173,8 +174,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                     case Some(updatedLeaderIsrAndControllerEpoch) =>
                       // send the shrunk ISR state change request only to the leader
                       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
-                        topic, partition, updatedLeaderIsrAndControllerEpoch,
-                        replicaAssignment.size)
+                        topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
                       replicaState.put((topic, partition, replicaId), OfflineReplica)
                       stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica"
                                                 .format(controllerId, controller.epoch, replicaId, topicAndPartition))
@@ -245,7 +245,9 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
             try {
               val curBrokerIds = currentBrokerList.map(_.toInt).toSet
               val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
-              val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+              val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _))
+              val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
+              controllerContext.allBrokers ++= newBrokers.map(b => (b.id -> b))
               val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
               controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
               info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 7b8d1f0..1437496 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -49,7 +49,7 @@ object RequestChannel extends Logging {
     val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
     buffer = null
     private val requestLogger = Logger.getLogger("kafka.request.logger")
-    trace("Received request : %s".format(requestObj))
+    trace("Processor %d received request : %s".format(processor, requestObj))
 
     def updateRequestMetrics() {
       val endTimeMs = SystemTime.milliseconds
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 5a44c28..d5bd143 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -61,20 +61,19 @@ class SocketServer(val brokerId: Int,
     this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize)
     Utils.newThread("kafka-acceptor", acceptor, false).start()
     acceptor.awaitStartup
-    info("started")
+    info("Started")
   }
 
   /**
    * Shutdown the socket server
    */
   def shutdown() = {
-    info("shutting down")
+    info("Shutting down")
     if(acceptor != null)
       acceptor.shutdown()
     for(processor <- processors)
       processor.shutdown()
-    requestChannel.shutdown
-    info("shut down completely")
+    info("Shutdown completed")
   }
 }
 
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 82e6e4d..13a8aa6 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -54,6 +54,13 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
           }
       }
     val partitionMetadata = metadata.partitionsMetadata
+    if(partitionMetadata.size == 0) {
+      if(metadata.errorCode != ErrorMapping.NoError) {
+        throw new KafkaException(ErrorMapping.exceptionFor(metadata.errorCode))
+      } else {
+        throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata))
+      }
+    }
     partitionMetadata.map { m =>
       m.leader match {
         case Some(leader) =>
@@ -77,7 +84,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
     // throw partition specific exception
     topicsMetadata.foreach(tmd =>{
       trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
-      if(tmd.errorCode == ErrorMapping.NoError){
+      if(tmd.errorCode == ErrorMapping.NoError) {
         topicPartitionInfo.put(tmd.topic, tmd)
       } else
         warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 89cb27d..1a74951 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -52,7 +52,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
   def handle(events: Seq[KeyedMessage[K,V]]) {
     lock synchronized {
       val serializedData = serialize(events)
-      serializedData.foreach{
+      serializedData.foreach {
         keyed =>
           val dataSize = keyed.message.payloadSize
           producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
@@ -61,6 +61,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       var outstandingProduceRequests = serializedData
       var remainingRetries = config.messageSendMaxRetries + 1
       val correlationIdStart = correlationId.get()
+      debug("Handling %d events".format(events.size))
       while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
         topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
         if (topicMetadataRefreshInterval >= 0 &&
@@ -70,7 +71,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           lastTopicMetadataRefreshTime = SystemTime.milliseconds
         }
         outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
-        if (outstandingProduceRequests.size > 0)  {
+        if (outstandingProduceRequests.size > 0) {
+          info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
           // back off and update the topic metadata cache before attempting another send operation
           Thread.sleep(config.retryBackoffMs)
           // get topics of the outstanding produce requests and refresh metadata for those
@@ -177,7 +179,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     }catch {    // Swallow recoverable exceptions and return None so that they can be retried.
       case ute: UnknownTopicException => warn("Failed to collate messages by topic,partition due to", ute); None
       case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to", lnae); None
-      case oe => error("Failed to collate messages by topic, partition due to", oe); throw oe
+      case oe => error("Failed to collate messages by topic, partition due to", oe); None
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f5288bf..57ca629 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -17,11 +17,10 @@
 
 package kafka.server
 
-import kafka.admin.{CreateTopicCommand, AdminUtils}
+import kafka.admin.CreateTopicCommand
 import kafka.api._
 import kafka.message._
 import kafka.network._
-import org.apache.log4j.Logger
 import scala.collection._
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic._
@@ -30,6 +29,7 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.common._
 import kafka.utils.{ZkUtils, Pool, SystemTime, Logging}
 import kafka.network.RequestChannel.Response
+import kafka.cluster.Broker
 
 
 /**
@@ -45,7 +45,13 @@ class KafkaApis(val requestChannel: RequestChannel,
   private val fetchRequestPurgatory =
     new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests)
   private val delayedRequestMetrics = new DelayedRequestMetrics
-
+  /* following 3 data structures are updated by the update metadata request
+  * and is queried by the topic metadata request. */
+  var leaderCache: mutable.Map[TopicAndPartition, PartitionStateInfo] =
+    new mutable.HashMap[TopicAndPartition, PartitionStateInfo]()
+  private var allBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
+  private var aliveBrokers: mutable.Set[Int] = new mutable.HashSet[Int]()
+  private val partitionMetadataLock = new Object
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
 
   /**
@@ -61,6 +67,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
         case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
         case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
+        case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
         case requestId => throw new KafkaException("No mapping found for handler id " + requestId)
       }
     } catch {
@@ -84,7 +91,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-
   def handleStopReplicaRequest(request: RequestChannel.Request) {
     val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
     val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
@@ -93,6 +99,31 @@ class KafkaApis(val requestChannel: RequestChannel,
     replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
   }
 
+  def handleUpdateMetadataRequest(request: RequestChannel.Request) {
+    val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
+    try {
+      partitionMetadataLock synchronized {
+        // cache the list of alive broker ids in the cluster
+        aliveBrokers = mutable.HashSet() ++ updateMetadataRequest.aliveBrokers
+        // cache the list of all brokers in the cluster
+        updateMetadataRequest.allBrokers.foreach(b => allBrokers.put(b.id, b))
+        updateMetadataRequest.partitionStateInfos.foreach { partitionState =>
+          leaderCache.put(partitionState._1, partitionState._2)
+          debug("Caching leader info %s for remote partition %s".format(partitionState._2, partitionState._1))
+        }
+      }
+      val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId,
+        updateMetadataRequest.partitionStateInfos.map(p => (p._1 -> ErrorMapping.NoError)))
+      requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse)))
+    } catch {
+      case e =>
+        val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId,
+          updateMetadataRequest.partitionStateInfos.map(p => (p._1 -> ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]))),
+          ErrorMapping.codeFor(e.getCause.asInstanceOf[Class[Throwable]]))
+        requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse)))
+    }
+  }
+
   /**
    * Check if a partitionData from a produce request can unblock any
    * DelayedFetch requests.
@@ -390,48 +421,97 @@ class KafkaApis(val requestChannel: RequestChannel,
     val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
     val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
     val config = replicaManager.config
-    val uniqueTopics = {
-      if(metadataRequest.topics.size > 0)
-        metadataRequest.topics.toSet
-      else
-        ZkUtils.getAllTopics(zkClient).toSet
-    }
-    val topicMetadataList = AdminUtils.fetchTopicMetadataFromZk(uniqueTopics, zkClient)
-    topicMetadataList.foreach(
-      topicAndMetadata => {
-        topicAndMetadata.errorCode match {
-          case ErrorMapping.NoError => topicsMetadata += topicAndMetadata
-          case ErrorMapping.UnknownTopicOrPartitionCode =>
-            try {
-              /* check if auto creation of topics is turned on */
-              if (config.autoCreateTopicsEnable) {
-                try {
-                  CreateTopicCommand.createTopic(zkClient, topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
-                  info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-                               .format(topicAndMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
-                } catch {
-                  case e: TopicExistsException => // let it go, possibly another broker created this topic
-                }
-                val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicAndMetadata.topic, zkClient)
-                topicsMetadata += newTopicMetadata
-                newTopicMetadata.errorCode match {
-                  case ErrorMapping.NoError =>
-                  case _ => throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topicAndMetadata.topic))
+    var uniqueTopics = Set.empty[String]
+    try {
+      uniqueTopics = {
+        if(metadataRequest.topics.size > 0)
+          metadataRequest.topics.toSet
+        else
+          ZkUtils.getAllTopics(zkClient).toSet
+      }
+      val topicMetadataList =
+        partitionMetadataLock synchronized {
+          uniqueTopics.map { topic =>
+            if(leaderCache.keySet.map(_.topic).contains(topic)) {
+              val partitionReplicaAssignment = leaderCache.filter(p => p._1.topic.equals(topic))
+              val sortedPartitions = partitionReplicaAssignment.toList.sortWith((m1,m2) => m1._1.partition < m2._1.partition)
+              val partitionMetadata = sortedPartitions.map { partitionReplicaMap =>
+                val partition = partitionReplicaMap._1.partition
+                val topicAndPartition = TopicAndPartition(topic, partition)
+                val replicas = leaderCache(topicAndPartition).allReplicas
+                var replicaInfo: Seq[Broker] = replicas.map(allBrokers.getOrElse(_, null)).filter(_ != null).toSeq
+                val partitionStateOpt = leaderCache.get(topicAndPartition)
+                var leaderInfo: Option[Broker] = None
+                var isrInfo: Seq[Broker] = Nil
+                partitionStateOpt match {
+                  case Some(partitionState) =>
+                    val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
+                    val leader = leaderIsrAndEpoch.leaderAndIsr.leader
+                    val isr = leaderIsrAndEpoch.leaderAndIsr.isr
+                    debug("[%s,%d]".format(topic,partition) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader)
+                    try {
+                      if(aliveBrokers.contains(leader))
+                        leaderInfo = Some(allBrokers(leader))
+                      else throw new LeaderNotAvailableException("Leader not available for partition [%s,%d]".format(topic, partition))
+                      isrInfo = isr.map(allBrokers.getOrElse(_, null)).filter(_ != null)
+                      if(replicaInfo.size < replicas.size)
+                        throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
+                          replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
+                      if(isrInfo.size < isr.size)
+                        throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
+                          isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
+                      new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
+                    } catch {
+                      case e =>
+                        error("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
+                        new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
+                          ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+                    }
+                  case None => // it is possible that for a newly created topic/partition, its replicas are assigned, but a
+                    // leader hasn't been assigned yet
+                    debug("[%s,%d]".format(topic,partition) + ";replicas = " + replicas + ", in sync replicas = None, leader = None")
+                    new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.LeaderNotAvailableCode)
                 }
               }
-            } catch {
-              case e => error("Error while retrieving topic metadata", e)
+              new TopicMetadata(topic, partitionMetadata)
+            } else {
+              // topic doesn't exist, send appropriate error code
+              new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
             }
-          case _ => 
-            error("Error while fetching topic metadata for topic " + topicAndMetadata.topic,
-                  ErrorMapping.exceptionFor(topicAndMetadata.errorCode).getCause)
-            topicsMetadata += topicAndMetadata
+          }
         }
-      })
-    trace("Sending topic metadata for correlation id %d to client %s".format(metadataRequest.correlationId, metadataRequest.clientId))
-    topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
-    val response = new TopicMetadataResponse(topicsMetadata.toSeq, metadataRequest.correlationId)
-    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+
+      // handle auto create topics
+      topicMetadataList.foreach { topicMetadata =>
+        topicMetadata.errorCode match {
+          case ErrorMapping.NoError => topicsMetadata += topicMetadata
+          case ErrorMapping.UnknownTopicOrPartitionCode =>
+            if (config.autoCreateTopicsEnable) {
+              try {
+                CreateTopicCommand.createTopic(zkClient, topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor)
+                info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+                  .format(topicMetadata.topic, config.numPartitions, config.defaultReplicationFactor))
+              } catch {
+                case e: TopicExistsException => // let it go, possibly another broker created this topic
+              }
+              topicsMetadata += new TopicMetadata(topicMetadata.topic, topicMetadata.partitionsMetadata, ErrorMapping.LeaderNotAvailableCode)
+            } else {
+              topicsMetadata += topicMetadata
+            }
+          case _ =>
+            error("Error while fetching topic metadata for topic %s due to %s ".format(topicMetadata.topic,
+              ErrorMapping.exceptionFor(topicMetadata.errorCode).getClass.getName))
+            topicsMetadata += topicMetadata
+        }
+      }
+    } catch {
+      case e =>
+        error("Failed to serve TopicMetadataRequest %s due to".format(metadataRequest), e)
+    } finally {
+      trace("Sending topic metadata %s for correlation id %d to client %s".format(topicsMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
+      val response = new TopicMetadataResponse(topicsMetadata.toSeq, metadataRequest.correlationId)
+      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+    }
   }
 
   def close() {
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 842dcf3..fed0b86 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -33,14 +33,15 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha
       try {
         val req = requestChannel.receiveRequest()
         if(req eq RequestChannel.AllDone) {
-          trace("receives shut down command, shut down".format(brokerId, id))
+          debug("Kafka request handler %d on broker %d received shut down command".format(
+            id, brokerId))
           return
         }
         req.dequeueTimeMs = SystemTime.milliseconds
-        debug("handles request " + req)
+        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
         apis.handle(req)
       } catch {
-        case e: Throwable => error("exception when handling request", e)
+        case e: Throwable => error("Exception when handling request")
       }
     }
   }
@@ -55,12 +56,12 @@ class KafkaRequestHandlerPool(val brokerId: Int,
   this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
   val threads = new Array[Thread](numThreads)
   val runnables = new Array[KafkaRequestHandler](numThreads)
-  for(i <- 0 until numThreads) { 
+  for(i <- 0 until numThreads) {
     runnables(i) = new KafkaRequestHandler(i, brokerId, requestChannel, apis)
     threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
     threads(i).start()
   }
-  
+
   def shutdown() {
     info("shutting down")
     for(handler <- runnables)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index b4a57c6..f7e13f1 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -38,7 +38,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   var logManager: LogManager = null
   var kafkaZookeeper: KafkaZooKeeper = null
   var replicaManager: ReplicaManager = null
-  private var apis: KafkaApis = null
+  var apis: KafkaApis = null
   var kafkaController: KafkaController = null
   val kafkaScheduler = new KafkaScheduler(4)
   var zkClient: ZkClient = null
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 03f621a..a84da13 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -19,8 +19,8 @@ package kafka.server
 
 import kafka.cluster.Broker
 import kafka.message.ByteBufferMessageSet
-import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData}
-import kafka.common.{KafkaStorageException, TopicAndPartition, ErrorMapping}
+import kafka.api.{OffsetRequest, FetchResponsePartitionData}
+import kafka.common.{KafkaStorageException, TopicAndPartition}
 
 class ReplicaFetcherThread(name:String,
                            sourceBroker: Broker,
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index b0a0e09..b8d0926 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -160,8 +160,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     // create leaders for all partitions
     TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-    val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas)
-    val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id).toList).toList
+    val actualReplicaList = leaderForPartitionMap.keys.toArray.map(p => (p -> ZkUtils.getReplicasForPartition(zkClient, topic, p))).toMap
     assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
     for(i <- 0 until actualReplicaList.size)
       assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
@@ -176,39 +175,6 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
   }
 
   @Test
-  def testGetTopicMetadata() {
-    val expectedReplicaAssignment = Map(
-      0 -> List(0, 1, 2),
-      1 -> List(1, 2, 3)
-    )
-    val leaderForPartitionMap = Map(
-      0 -> 0,
-      1 -> 1
-    )
-    val topic = "auto-topic"
-    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
-    // create leaders for all partitions
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-
-    val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-    newTopicMetadata.errorCode match {
-      case ErrorMapping.UnknownTopicOrPartitionCode =>
-        fail("Topic " + topic + " should've been automatically created")
-      case _ =>
-        assertEquals(topic, newTopicMetadata.topic)
-        assertNotNull("partition metadata list cannot be null", newTopicMetadata.partitionsMetadata)
-        assertEquals("partition metadata list length should be 2", 2, newTopicMetadata.partitionsMetadata.size)
-        val actualReplicaAssignment = newTopicMetadata.partitionsMetadata.map(p => p.replicas)
-        val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id).toList).toList
-        assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
-        for(i <- 0 until actualReplicaList.size) {
-          assertEquals(expectedReplicaAssignment(i), actualReplicaList(i))
-        }
-    }
-  }
-
-  @Test
   def testPartitionReassignmentWithLeaderInNewReplicas() {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
@@ -278,7 +244,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
       CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas,
         Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
-    }, 1000)
+    }, 2000)
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
     // leader should be 2
@@ -360,49 +326,43 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
   @Test
   def testShutdownBroker() {
-    info("inside testShutdownBroker")
     val expectedReplicaAssignment = Map(1  -> List(0, 1, 2))
     val topic = "test"
     val partition = 1
     // create brokers
     val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
+    val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
     // create the topic
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
-    val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
 
-    // broker 2 should be the leader since it was started first
-    var leaderBeforeShutdown = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, 1000, None).get
-    var controllerId = ZkUtils.getController(zkClient)
-    var controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+    assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() =>
+       servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition))), 1000))
+
+    val controllerId = ZkUtils.getController(zkClient)
+    val controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
     var partitionsRemaining = controller.shutdownBroker(2)
+    // wait for the update metadata request to trickle to the brokers
+    assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() =>
+       servers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000))
     try {
       assertEquals(0, partitionsRemaining)
-      var topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-      var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-      assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
-      // assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
-      assertEquals(2, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+      var partitionStateInfo = servers.head.apis.leaderCache(TopicAndPartition(topic, partition))
+      var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+      assertEquals(0, leaderAfterShutdown)
+      assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
+      assertEquals(List(0,1), partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr)
 
-      leaderBeforeShutdown = leaderAfterShutdown
-      controllerId = ZkUtils.getController(zkClient)
-      controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
       partitionsRemaining = controller.shutdownBroker(1)
       assertEquals(0, partitionsRemaining)
-      topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-      leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-      assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
-      // assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
-      assertEquals(1, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+      partitionStateInfo = servers.head.apis.leaderCache(TopicAndPartition(topic, partition))
+      leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+      assertEquals(0, leaderAfterShutdown)
 
-      leaderBeforeShutdown = leaderAfterShutdown
-      controllerId = ZkUtils.getController(zkClient)
-      controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
+      assertTrue(servers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
       partitionsRemaining = controller.shutdownBroker(0)
       assertEquals(1, partitionsRemaining)
-      topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
-      leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
-      assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
-      assertEquals(1, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+      // leader doesn't change since all the replicas are shut down
+      assertTrue(servers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
     }
     finally {
       servers.foreach(_.shutdown())
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 0f15718..fad63da 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -87,8 +87,8 @@ object SerializationTestUtils{
   def createTestLeaderAndIsrRequest() : LeaderAndIsrRequest = {
     val leaderAndIsr1 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader1, 1, isr1, 1), 1)
     val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1)
-    val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
-                  ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
+    val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3, isr1.toSet)),
+                  ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3, isr2.toSet)))
     new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1, 0, "")
   }
 
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 4d989e4..482b71d 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -23,7 +23,6 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.server._
 import scala.collection._
 import org.scalatest.junit.JUnit3Suite
-import org.apache.log4j.{Level, Logger}
 import kafka.message._
 import kafka.serializer._
 import kafka.admin.CreateTopicCommand
@@ -31,7 +30,10 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.utils._
 import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
 import java.util.{Collections, Properties}
+import org.apache.log4j.{Logger, Level}
 import kafka.utils.TestUtils._
+import junit.framework.Assert
+import kafka.common.TopicAndPartition
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
@@ -97,6 +99,11 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
+    Assert.assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0))), 1000))
+    Assert.assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 1))), 1000))
+
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
@@ -142,7 +149,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
     val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
     // send some messages to each broker
-    val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++ 
+    val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++
                         sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
@@ -167,12 +174,17 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++ 
+    val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++
                         sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
+    Assert.assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0))), 1000))
+    Assert.assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 1))), 1000))
+
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@@ -240,9 +252,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
   def testCompressionSetConsumption() {
     // send some messages to each broker
-    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++ 
+    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++
                        sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec)
 
+    Assert.assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0))), 1000))
+    Assert.assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 1))), 1000))
+
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
@@ -263,9 +280,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.FATAL)
 
     // send some messages to each broker
-    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++ 
+    val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++
                        sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec)
 
+    Assert.assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0))), 1000))
+    Assert.assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 1))), 1000))
+
     val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
 
     waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
@@ -303,6 +325,9 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     // send some messages to each broker
     val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1)
 
+    Assert.assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0))), 1000))
+
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
@@ -321,12 +346,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
 
     val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
     assertEquals(sentMessages1, receivedMessages1)
+    zkConsumerConnector1.shutdown()
+    zkClient.close()
   }
 
-  def sendMessagesToBrokerPartition(config: KafkaConfig, 
-                                    topic: String, 
-                                    partition: Int, 
-                                    numMessages: Int, 
+  def sendMessagesToBrokerPartition(config: KafkaConfig,
+                                    topic: String,
+                                    partition: Int,
+                                    numMessages: Int,
                                     compression: CompressionCodec = NoCompressionCodec): List[String] = {
     val header = "test-%d-%d".format(config.brokerId, partition)
     val props = new Properties()
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 4c646f0..2deb439 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -26,6 +26,7 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.utils.TestUtils
 import kafka.serializer._
 import kafka.producer.{Producer, KeyedMessage}
+import kafka.common.TopicAndPartition
 
 class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
@@ -57,7 +58,7 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
   
   def testResetToEarliestWhenOffsetTooLow() =
     assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", SmallOffset))
-    
+
   def testResetToLatestWhenOffsetTooHigh() =
     assertEquals(0, resetAndConsume(NumMessages, "largest", LargeOffset))
 
@@ -69,12 +70,17 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
    * Returns the count of messages received.
    */
   def resetAndConsume(numMessages: Int, resetTo: String, offset: Long): Int = {
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+
     val producer: Producer[String, Array[Byte]] = TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs), 
         new DefaultEncoder(), new StringEncoder())
 
     for(i <- 0 until numMessages)
       producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes))
 
+    assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0))), 1000))
+
     // update offset in zookeeper for consumer to jump "forward" in time
     val dirs = new ZKGroupTopicDirs(group, topic)
     var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
@@ -99,8 +105,10 @@ class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with L
     } catch {
       case e: ConsumerTimeoutException => 
         info("consumer timed out after receiving " + received + " messages.")
+    } finally {
+      producer.close()
+      consumerConnector.shutdown
     }
-    consumerConnector.shutdown
     received
   }
   
diff --git a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
index c4866eb..5371edf 100644
--- a/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
@@ -24,9 +24,11 @@ import org.apache.log4j.{Level, Logger}
 import org.junit.Assert._
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
-import kafka.producer.KeyedMessage
 import kafka.utils._
-import kafka.common.{ErrorMapping, KafkaException, OffsetOutOfRangeException}
+import kafka.common.{TopicAndPartition, ErrorMapping, KafkaException, OffsetOutOfRangeException}
+import kafka.producer.KeyedMessage
+import org.junit.Assert.assertEquals
+import junit.framework.Assert
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -63,6 +65,9 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
 
     producer.send(producerData:_*)
 
+    Assert.assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0))), 1000))
+
     var fetchedMessage: ByteBufferMessageSet = null
     while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
       val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
@@ -90,6 +95,8 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
         val producedData = List("a_" + topic, "b_" + topic)
         messages += topic -> producedData
         producer.send(producedData.map(m => new KeyedMessage[String, String](topic, topic, m)):_*)
+        Assert.assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+          servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0))), 1000))
         builder.addFetch(topic, offset, 0, 10000)
       }
 
@@ -132,6 +139,8 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
       builder.addFetch(topic, 0, 0, 10000)
     }
     producer.send(produceList: _*)
+    topics.foreach(topic => Assert.assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0))), 1000)))
 
     // wait a bit for produced message to be available
     val request = builder.build()
@@ -155,6 +164,8 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
       builder.addFetch(topic, 0, 0, 10000)
     }
     producer.send(produceList: _*)
+    topics.foreach(topic => Assert.assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0))), 1000)))
 
     producer.send(produceList: _*)
     // wait a bit for produced message to be available
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 2fc08d3..97d07d0 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -27,7 +27,7 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
-import kafka.admin.{AdminUtils, CreateTopicCommand}
+import kafka.admin.CreateTopicCommand
 import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
 import kafka.utils.{TestUtils, Utils}
 
@@ -35,7 +35,7 @@ import kafka.utils.{TestUtils, Utils}
  * End to end tests of the primitive apis against a local server
  */
 class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness {
-  
+
   val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props)
@@ -301,7 +301,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     val newTopic = "new-topic"
     CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString)
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
-      AdminUtils.fetchTopicMetadataFromZk(newTopic, zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+       servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition(newTopic, 0))), 1000))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500)
     val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
     assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 6db63ba..74f7c22 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -22,26 +22,27 @@ import kafka.zk.ZooKeeperTestHarness
 import kafka.admin.CreateTopicCommand
 import java.nio.ByteBuffer
 import junit.framework.Assert._
-import org.easymock.EasyMock
-import kafka.network._
 import kafka.cluster.Broker
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
-import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig}
-import kafka.common.ErrorMapping
-import kafka.api.{RequestKeys, TopicMetadata, TopicMetadataResponse, TopicMetadataRequest}
+import kafka.server.{KafkaServer, KafkaConfig}
+import kafka.api.TopicMetadataRequest
+import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.client.ClientUtils
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
   val configs = props.map(p => new KafkaConfig(p))
-  var brokers: Seq[Broker] = null
+  private var server1: KafkaServer = null
+  val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port))
 
   override def setUp() {
     super.setUp()
-    brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
+    server1 = TestUtils.createServer(configs.head)
   }
 
   override def tearDown() {
+    server1.shutdown()
     super.tearDown()
   }
 
@@ -65,16 +66,16 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
     // create topic
     val topic = "test"
     CreateTopicCommand.createTopic(zkClient, topic, 1)
-    // set up leader for topic partition 0
-    val leaderForPartitionMap = Map(
-      0 -> configs.head.brokerId
-    )
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-    val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
-    val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
-    assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
-    val partitionMetadata = topicMetadata.head.partitionsMetadata
+    assertTrue("Topic test metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      server1.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0)), 1000))
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata",
+      2000,0).topicsMetadata
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
+    assertEquals("Expecting metadata for the test topic", "test", topicsMetadata.head.topic)
+    var partitionMetadata = topicsMetadata.head.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
     assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
     assertEquals(1, partitionMetadata.head.replicas.size)
@@ -82,60 +83,58 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   def testGetAllTopicMetadata {
     // create topic
-    val topic = "test"
-    CreateTopicCommand.createTopic(zkClient, topic, 1)
-    // set up leader for topic partition 0
-    val leaderForPartitionMap = Map(
-      0 -> configs.head.brokerId
-    )
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
-    val topicMetadataRequest = new TopicMetadataRequest(List(), 0)
-    val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
-    assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
-    val partitionMetadata = topicMetadata.head.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(1, partitionMetadata.head.replicas.size)
+    val topic1 = "testGetAllTopicMetadata1"
+    val topic2 = "testGetAllTopicMetadata2"
+    CreateTopicCommand.createTopic(zkClient, topic1, 1)
+    CreateTopicCommand.createTopic(zkClient, topic2, 1)
+
+    // wait for leader to be elected for both topics
+    assertTrue("Topic new-topic metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      server1.apis.leaderCache.keySet.contains(TopicAndPartition(topic1, 0)), 1000))
+    assertTrue("Topic new-topic metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      server1.apis.leaderCache.keySet.contains(TopicAndPartition(topic2, 0)), 1000))
+
+    // issue metadata request with empty list of topics
+    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokers, "TopicMetadataTest-testGetAllTopicMetadata",
+      2000, 0).topicsMetadata
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(2, topicsMetadata.size)
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    assertEquals(ErrorMapping.NoError, topicsMetadata.last.partitionsMetadata.head.errorCode)
+    val partitionMetadataTopic1 = topicsMetadata.head.partitionsMetadata
+    val partitionMetadataTopic2 = topicsMetadata.last.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic1.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic1.head.partitionId)
+    assertEquals(1, partitionMetadataTopic1.head.replicas.size)
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic2.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic2.head.partitionId)
+    assertEquals(1, partitionMetadataTopic2.head.replicas.size)
   }
 
   def testAutoCreateTopic {
     // auto create topic
-    val topic = "test"
-
-    val topicMetadataRequest = new TopicMetadataRequest(List(topic), 0)
-    val topicMetadata = mockLogManagerAndTestTopic(topicMetadataRequest)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
-    assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
-    val partitionMetadata = topicMetadata.head.partitionsMetadata
+    val topic = "testAutoCreateTopic"
+    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testAutoCreateTopic",
+      2000,0).topicsMetadata
+    assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
+    assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
+    assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
+
+    // wait for leader to be elected
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+    assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      server1.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0)), 1000))
+
+    // retry the metadata for the auto created topic
+    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata",
+      2000,0).topicsMetadata
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode)
+    assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)
+    var partitionMetadata = topicsMetadata.head.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
     assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(0, partitionMetadata.head.replicas.size)
-    assertEquals(None, partitionMetadata.head.leader)
-    assertEquals(ErrorMapping.LeaderNotAvailableCode, partitionMetadata.head.errorCode)
-  }
-
-  private def mockLogManagerAndTestTopic(request: TopicMetadataRequest): Seq[TopicMetadata] = {
-    // topic metadata request only requires 1 call from the replica manager
-    val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
-    EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
-    EasyMock.replay(replicaManager)
-
-
-    val serializedMetadataRequest = TestUtils.createRequestByteBuffer(request)
-
-    // create the kafka request handler
-    val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1)
-
-    // call the API (to be tested) to get metadata
-    apis.handleTopicMetadataRequest(new RequestChannel.Request
-      (processor=0, requestKey=RequestKeys.MetadataKey, buffer=serializedMetadataRequest, startTimeMs=1))
-    val metadataResponse = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
-    
-    // check assertions
-    val topicMetadata = TopicMetadataResponse.readFrom(metadataResponse).topicsMetadata
-
-    topicMetadata
+    assertEquals(1, partitionMetadata.head.replicas.size)
+    assertTrue(partitionMetadata.head.leader.isDefined)
   }
 }
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 7f7a8d7..a70e2ac 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -260,10 +260,11 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          topicPartitionInfos = topicPartitionInfos)
     try {
       handler.partitionAndCollate(producerDataList)
-      fail("Should fail with UnknownTopicOrPartitionException")
     }
     catch {
-      case e: UnknownTopicOrPartitionException => // expected, do nothing
+      // should not throw UnknownTopicOrPartitionException to allow resend
+      case e: UnknownTopicOrPartitionException => fail("Should fail with UnknownTopicOrPartitionException")
+
     }
   }
 
@@ -291,10 +292,10 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          topicPartitionInfos = topicPartitionInfos)
     try {
       handler.handle(producerDataList)
-      fail("Should fail with NoBrokersForPartitionException")
+      fail("Should fail with FailedToSendMessageException")
     }
     catch {
-      case e: NoBrokersForPartitionException => // expected, do nothing
+      case e: FailedToSendMessageException => // we retry on any exception now
     }
   }
 
@@ -418,6 +419,8 @@ class AsyncProducerTest extends JUnit3Suite {
     val response2 = ProducerResponse(0,
       Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
+    // don't care about config mock
+    EasyMock.expect(mockSyncProducer.config).andReturn(EasyMock.anyObject()).anyTimes()
     EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
     EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response1)
     EasyMock.expect(mockSyncProducer.send(request3)).andReturn(response2)
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index bc37531..bd8408b 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -27,10 +27,14 @@ import org.junit.Assert._
 import org.junit.Test
 import kafka.utils._
 import java.util
-import kafka.admin.{AdminUtils, CreateTopicCommand}
+import kafka.admin.CreateTopicCommand
 import util.Properties
 import kafka.api.FetchRequestBuilder
-import kafka.common.{KafkaException, ErrorMapping, FailedToSendMessageException}
+import kafka.common.{TopicAndPartition, ErrorMapping, FailedToSendMessageException}
+import org.junit.Assert.assertTrue
+import org.junit.Assert.assertFalse
+import org.junit.Assert.assertEquals
+import org.junit.Assert.fail
 
 
 class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
@@ -43,6 +47,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
   private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
+  private var servers = List.empty[KafkaServer]
 
   private val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
   private val config1 = new KafkaConfig(props1) {
@@ -60,6 +65,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     // set up 2 brokers with 4 partitions each
     server1 = TestUtils.createServer(config1)
     server2 = TestUtils.createServer(config2)
+    servers = List(server1,server2)
 
     val props = new Properties()
     props.put("host", "localhost")
@@ -68,7 +74,6 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024, "")
     consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024, "")
 
-
     // temporarily set request handler logger to a higher level
     requestHandlerLogger.setLevel(Level.FATAL)
   }
@@ -88,8 +93,9 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
 
   def testUpdateBrokerPartitionInfo() {
     CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
+    // wait until the update metadata request for new topic reaches all servers
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
-      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition("new-topic", 0))), 1000))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     val props1 = new util.Properties()
@@ -101,7 +107,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
       producer1.send(new KeyedMessage[String, String]("new-topic", "test", "test1"))
       fail("Test should fail because the broker list provided are not valid")
     } catch {
-      case e: KafkaException =>
+      case e: FailedToSendMessageException =>
       case oe => fail("fails with exception", oe)
     } finally {
       producer1.close()
@@ -154,7 +160,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     // create topic with 1 partition and await leadership
     CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
-      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition("new-topic", 0))), 1000))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     val producer1 = new Producer[String, String](producerConfig1)
@@ -204,8 +210,9 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
 
     // create topic
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
+    // waiting for 1 partition is enough
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
-      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition("new-topic", 0))), 1000))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 1, 500)
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 2, 500)
@@ -259,14 +266,14 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
     props.put("request.timeout.ms", String.valueOf(timeoutMs))
     props.put("metadata.broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
     props.put("request.required.acks", "1")
-
+    props.put("client.id","ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout")
     val config = new ProducerConfig(props)
     val producer = new Producer[String, String](config)
 
     // create topics in ZK
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
-      AdminUtils.fetchTopicMetadataFromZk("new-topic", zkClient).errorCode != ErrorMapping.UnknownTopicOrPartitionCode, zookeeper.tickTime))
+      servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(TopicAndPartition("new-topic", 0))), 1000))
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     // do a simple test to make sure plumbing is okay
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 8f88177..7c5ecf3 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -132,7 +132,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     val leaderAndIsr = new collection.mutable.HashMap[(String, Int), LeaderIsrAndControllerEpoch]
     leaderAndIsr.put((topic, partitionId),
       new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2))
-    val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1)).toMap
+    val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1, Set(0,1))).toMap
     val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId,
                                                       staleControllerEpoch, 0, "")
 
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 3728f8c..0c56848 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -29,6 +29,8 @@ import kafka.utils.TestUtils._
 import kafka.admin.CreateTopicCommand
 import kafka.api.FetchRequestBuilder
 import kafka.utils.{TestUtils, Utils}
+import junit.framework.Assert
+import kafka.common.TopicAndPartition
 
 class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
   val port = TestUtils.choosePort
@@ -50,6 +52,9 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     // create topic
     CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+    Assert.assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0)), 1000))
+
     // send some messages
     producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)
 
@@ -65,11 +70,13 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
     server = new KafkaServer(config)
     server.startup()
 
+    // wait for the broker to receive the update metadata request after startup
+    Assert.assertTrue("Topic " + topic + " metadata not propagated after timeout", TestUtils.waitUntilTrue(() =>
+      server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0)), 1000))
+
     producer = new Producer[Int, String](new ProducerConfig(producerConfig))
     val consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
 
-    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
-
     var fetchedMessage: ByteBufferMessageSet = null
     while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
       val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build())
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 1557047..c7dd8a7 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -20,7 +20,7 @@ import kafka.cluster.{Partition, Replica}
 import kafka.log.Log
 import kafka.message.{ByteBufferMessageSet, Message}
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
-import kafka.utils.{Time, TestUtils, MockTime}
+import kafka.utils.{ZkUtils, Time, TestUtils, MockTime}
 import org.easymock.EasyMock
 import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
@@ -57,7 +57,9 @@ class SimpleFetchTest extends JUnit3Suite {
     val fetchSize = 100
     val messages = new Message("test-message".getBytes())
 
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    // create nice mock since we don't particularly care about zkclient calls
+    val zkClient = EasyMock.createNiceMock(classOf[ZkClient])
+    EasyMock.expect(zkClient.exists(ZkUtils.ControllerEpochPath)).andReturn(false)
     EasyMock.replay(zkClient)
 
     val log = EasyMock.createMock(classOf[kafka.log.Log])
@@ -151,7 +153,8 @@ class SimpleFetchTest extends JUnit3Suite {
     val followerReplicaId = configs(1).brokerId
     val followerLEO = 15
 
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    val zkClient = EasyMock.createNiceMock(classOf[ZkClient])
+    EasyMock.expect(zkClient.exists(ZkUtils.ControllerEpochPath)).andReturn(false)
     EasyMock.replay(zkClient)
 
     val log = EasyMock.createMock(classOf[kafka.log.Log])
