Index: system_test/single_host_multi_brokers/config/server.properties
===================================================================
--- system_test/single_host_multi_brokers/config/server.properties	(revision 1362267)
+++ system_test/single_host_multi_brokers/config/server.properties	(working copy)
@@ -96,7 +96,8 @@
 #log.retention.size=1073741824
 
 # The maximum size of a log segment file. When this size is reached a new log segment will be created.
-log.file.size=536870912
+#log.file.size=536870912
+log.file.size=100
 
 # The interval at which log segments are checked to see if they can be deleted according 
 # to the retention policies
Index: system_test/single_host_multi_brokers/config/log4j.properties
===================================================================
--- system_test/single_host_multi_brokers/config/log4j.properties	(revision 1362267)
+++ system_test/single_host_multi_brokers/config/log4j.properties	(working copy)
@@ -31,6 +31,9 @@
 # to print message checksum from ProducerPerformance
 log4j.logger.kafka.perf=DEBUG
 log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG
+log4j.logger.kafka.producer.async=DEBUG
+log4j.logger.kafka.server.KafkaApis=TRACE
+log4j.logger.kafka.server.ReplicaManager=DEBUG
 
 # to print message checksum from ProducerPerformance
 log4j.logger.kafka.perf=DEBUG
Index: system_test/single_host_multi_brokers/bin/run-test.sh
===================================================================
--- system_test/single_host_multi_brokers/bin/run-test.sh	(revision 1362267)
+++ system_test/single_host_multi_brokers/bin/run-test.sh	(working copy)
@@ -42,7 +42,7 @@
 readonly replica_factor=3                             # should be less than or equal to "num_kafka_server"
 readonly my_brokerid_to_start=0                       # this should be '0' for now
 readonly my_server_port_to_start=9091                 # if using this default, the ports to be used will be 9091, 9092, ...
-readonly producer_msg_batch_size=200                  # batch no. of messsages by producer
+readonly producer_msg_batch_size=20                   # batch no. of messsages by producer
 readonly consumer_timeout_ms=10000                    # elapsed time for consumer to timeout and exit
 
 # ====================================
@@ -198,7 +198,7 @@
         --topic ${this_topic} \
         --messages $no_msg_to_produce \
         --message-size 100 \
-        --threads 5 \
+        --threads 1 \
         --initial-message-id $init_msg_id \
         2>&1 >> $producer_perf_log_pathname
 }
@@ -213,6 +213,7 @@
         --topic $this_consumer_topic \
         --formatter 'kafka.consumer.ConsoleConsumer$DecodedMessageFormatter' \
         --consumer-timeout-ms $consumer_timeout_ms \
+        --from-beginning \
         2>&1 >> $console_consumer_log_pathname &
 }
 
@@ -387,20 +388,16 @@
         ldr_bkr_id=$?
         info "leader broker id: $ldr_bkr_id"
 
-        svr_idx=$(($ldr_bkr_id + 1))
+        svr_idx=$(($ldr_bkr_id))
 
         # ==========================================================
         # If KAFKA-350 is fixed, uncomment the following 3 lines to
         # STOP the server for failure test
         # ==========================================================
-        #stop_server $svr_idx
-        #info "sleeping for 10s"
-        #sleep 10
+        # stop_server $svr_idx
+        # info "sleeping for 10s"
+        # sleep 10
 
-        start_console_consumer $test_topic localhost:$zk_port
-        info "sleeping for 5s"
-        sleep 5
-
         init_id=$(( ($i - 1) * $producer_msg_batch_size ))
         start_producer_perf $test_topic localhost:$zk_port $producer_msg_batch_size $init_id
         info "sleeping for 15s"
@@ -411,11 +408,15 @@
         # If KAFKA-350 is fixed, uncomment the following 3 lines to
         # START the server for failure test
         # ==========================================================
-        #start_server $svr_idx
-        #info "sleeping for 30s"
-        #sleep 30
+        # start_server $svr_idx
+        # info "sleeping for 30s"
+        # sleep 30
     done
 
+    start_console_consumer $test_topic localhost:$zk_port
+    info "sleeping for 15s"
+    sleep 15
+
     validate_results
     echo
 
Index: system_test/common/util.sh
===================================================================
--- system_test/common/util.sh	(revision 1362267)
+++ system_test/common/util.sh	(working copy)
@@ -120,7 +120,7 @@
         # ======================
         keyword_to_replace="brokerid="
         string_to_be_replaced=`echo "$server_properties" | grep $keyword_to_replace`
-        brokerid_idx=$(( $brokerid_to_start + $i - 1 ))
+        brokerid_idx=$(( $brokerid_to_start + $i))
         string_to_replace="${keyword_to_replace}${brokerid_idx}"
         # info "string to be replaced : [${string_to_be_replaced}]"
         # info "string to replace     : [${string_to_replace}]"
Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/TestUtils.scala	(revision 1362267)
+++ core/src/test/scala/unit/kafka/utils/TestUtils.scala	(working copy)
@@ -293,6 +293,18 @@
     new Producer[K, V](new ProducerConfig(props))
   }
 
+  def getProducerConfig(zkConnect: String, bufferSize: Int, connectTimeout: Int,
+                        reconnectInterval: Int): Properties = {
+    val props = new Properties()
+    props.put("producer.type", "sync")
+    props.put("zk.connect", zkConnect)
+    props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
+    props.put("buffer.size", bufferSize.toString)
+    props.put("connect.timeout.ms", connectTimeout.toString)
+    props.put("reconnect.interval", reconnectInterval.toString)
+    props
+  }
+
   def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
     val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
     ZkUtils.updatePersistentPath(zkClient, path, offset.toString)
@@ -368,6 +380,11 @@
     pr
   }
 
+  def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) {
+    leaderPerPartitionMap.foreach(leaderForPartition => ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic,
+      leaderForPartition._1, leaderForPartition._2))
+  }
+
   def waitUntilLeaderIsElected(zkClient: ZkClient, topic: String, partition: Int, timeoutMs: Long): Option[Int] = {
     val leaderLock = new ReentrantLock()
     val leaderExists = leaderLock.newCondition()
@@ -381,7 +398,8 @@
         case Some(l) => info("Leader %d exists for topic %s partition %d".format(l, topic, partition))
           leader
         case None => zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString),
-          new LeaderExists(topic, partition, leaderExists))
+          new LeaderExists(topic, partition, leaderLock, leaderExists))
+        info("No leader exists. Waiting for %d ms".format(timeoutMs))
         leaderExists.await(timeoutMs, TimeUnit.MILLISECONDS)
           // check if leader is elected
         val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(revision 1362267)
+++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(working copy)
@@ -25,11 +25,12 @@
 import junit.framework.Assert._
 import org.easymock.EasyMock
 import kafka.network._
-import kafka.api.{TopicMetaDataResponse, TopicMetadataRequest}
 import kafka.cluster.Broker
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import kafka.server.{ReplicaManager, KafkaZooKeeper, KafkaApis, KafkaConfig}
+import kafka.common.ErrorMapping
+import kafka.api.{TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest}
 
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -66,18 +67,38 @@
     // create topic
     val topic = "test"
     CreateTopicCommand.createTopic(zkClient, topic, 1)
-
-    mockLogManagerAndTestTopic(topic)
+    // set up leader for topic partition 0
+    val leaderForPartitionMap = Map(
+      0 -> configs.head.brokerId
+    )
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+    val topicMetadata = mockLogManagerAndTestTopic(topic)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
+    assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
+    val partitionMetadata = topicMetadata.head.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+    assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
+    assertEquals(1, partitionMetadata.head.replicas.size)
   }
 
   def testAutoCreateTopic {
     // auto create topic
     val topic = "test"
 
-    mockLogManagerAndTestTopic(topic)
+    val topicMetadata = mockLogManagerAndTestTopic(topic)
+    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
+    assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
+    val partitionMetadata = topicMetadata.head.partitionsMetadata
+    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+    assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
+    assertEquals(0, partitionMetadata.head.replicas.size)
+    assertEquals(None, partitionMetadata.head.leader)
+    assertEquals(ErrorMapping.LeaderNotAvailableCode, partitionMetadata.head.errorCode)
   }
 
-  private def mockLogManagerAndTestTopic(topic: String) = {
+  private def mockLogManagerAndTestTopic(topic: String): Seq[TopicMetadata] = {
     // topic metadata request only requires 2 APIs from the log manager
     val logManager = EasyMock.createMock(classOf[LogManager])
     val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
@@ -109,17 +130,12 @@
     
     // check assertions
     val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata
-    assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
-    assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
-    val partitionMetadata = topicMetadata.head.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(brokers, partitionMetadata.head.replicas)
-    assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
 
     // verify the expected calls to log manager occurred in the right order
     EasyMock.verify(logManager)
     EasyMock.verify(kafkaZookeeper)
     EasyMock.verify(receivedRequest)
+
+    topicMetadata
   }
 }
\ No newline at end of file
Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(revision 1362267)
+++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(working copy)
@@ -32,7 +32,7 @@
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
 import kafka.admin.CreateTopicCommand
-import kafka.common.{InvalidPartitionException, NotLeaderForPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
+import kafka.common.{InvalidPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
 
 /**
  * End to end tests of the primitive apis against a local server
Index: core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala	(revision 1362267)
+++ core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala	(working copy)
@@ -133,6 +133,8 @@
       produceList ::= new ProducerData[String, Message](topic, topic, set)
       builder.addFetch(topic, 0, 0, 10000)
     }
+    // wait until leader is elected
+    topics.foreach(topic => TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500))
     producer.send(produceList: _*)
 
     // wait a bit for produced message to be available
@@ -157,6 +159,9 @@
       produceList ::= new ProducerData[String, Message](topic, topic, set)
       builder.addFetch(topic, 0, 0, 10000)
     }
+    // wait until leader is elected
+    topics.foreach(topic => TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1500))
+
     producer.send(produceList: _*)
 
     producer.send(produceList: _*)
Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(revision 1362267)
+++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(working copy)
@@ -132,7 +132,7 @@
     Assert.assertEquals(request.correlationId, response.correlationId)
     Assert.assertEquals(response.errors.length, response.offsets.length)
     Assert.assertEquals(3, response.errors.length)
-    response.errors.foreach(Assert.assertEquals(ErrorMapping.NoLeaderForPartitionCode.toShort, _))
+    response.errors.foreach(Assert.assertEquals(ErrorMapping.LeaderNotAvailableCode.toShort, _))
     response.offsets.foreach(Assert.assertEquals(-1L, _))
 
     // #2 - test that we get correct offsets when partition is owned by broker
@@ -155,7 +155,7 @@
     Assert.assertEquals(messages.sizeInBytes, response2.offsets(2))
 
     // the middle message should have been rejected because broker doesn't lead partition
-    Assert.assertEquals(ErrorMapping.NoLeaderForPartitionCode.toShort, response2.errors(1))
+    Assert.assertEquals(ErrorMapping.LeaderNotAvailableCode.toShort, response2.errors(1))
     Assert.assertEquals(-1, response2.offsets(1))
   }
 
Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala	(revision 1362267)
+++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala	(working copy)
@@ -212,14 +212,14 @@
     topic2Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes))))
     val topic2Broker2Data = new ListBuffer[ProducerData[Int,Message]]
     topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes))))
-    val expectedResult = Map(
+    val expectedResult = Some(Map(
         0 -> Map(
               ("topic1", 0) -> topic1Broker1Data,
               ("topic2", 0) -> topic2Broker1Data),
         1 -> Map(
               ("topic1", 1) -> topic1Broker2Data,
               ("topic2", 1) -> topic2Broker2Data)
-      )
+      ))
 
     val actualResult = handler.partitionAndCollate(producerDataList)
     assertEquals(expectedResult, actualResult)
@@ -356,10 +356,15 @@
     producerDataList.append(new ProducerData[String,Message]("topic2", new Message("msg2".getBytes)))
     producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg3".getBytes)))
 
-    val partitionedData = handler.partitionAndCollate(producerDataList)
-    for ((brokerId, dataPerBroker) <- partitionedData) {
-      for ( ((topic, partitionId), dataPerTopic) <- dataPerBroker)
-        assertTrue(partitionId == 0)
+    val partitionedDataOpt = handler.partitionAndCollate(producerDataList)
+    partitionedDataOpt match {
+      case Some(partitionedData) =>
+        for ((brokerId, dataPerBroker) <- partitionedData) {
+          for ( ((topic, partitionId), dataPerTopic) <- dataPerBroker)
+            assertTrue(partitionId == 0)
+        }
+      case None =>
+        fail("Failed to collate requests by topic, partition")
     }
     EasyMock.verify(producerPool)
   }
Index: core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
===================================================================
--- core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala	(revision 1362267)
+++ core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala	(working copy)
@@ -26,7 +26,7 @@
 class EmbeddedZookeeper(val connectString: String) {
   val snapshotDir = TestUtils.tempDir()
   val logDir = TestUtils.tempDir()
-  val tickTime = 2000
+  val tickTime = 500
   val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
   val port = connectString.split(":")(1).toInt
   val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
Index: core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
===================================================================
--- core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala	(revision 1362267)
+++ core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala	(working copy)
@@ -28,8 +28,8 @@
 
   override def setUp() {
     zookeeper = new EmbeddedZookeeper(zkConnect)
-    zkClient = new ZkClient(zookeeper.connectString)
-    zkClient.setZkSerializer(ZKStringSerializer)
+    Thread.sleep(200)
+    zkClient = new ZkClient(zookeeper.connectString, 1000, 2000, ZKStringSerializer)
     super.setUp
   }
 
Index: core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(revision 1362267)
+++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(working copy)
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package kafka.server
 
@@ -51,7 +51,6 @@
     val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
 
     servers ++= List(server1, server2)
-    try {
     // start 2 brokers
     val topic = "new-topic"
     val partitionId = 0
@@ -66,14 +65,13 @@
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
 
     // kill the server hosting the preferred replica
-    servers.head.shutdown()
+    server1.shutdown()
 
     // check if leader moves to the other server
     leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 1500)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
-    Thread.sleep(zookeeper.tickTime)
-
+    val leaderPath = zkClient.getChildren(ZkUtils.getTopicPartitionPath(topic, "0"))
     // bring the preferred replica back
     servers.head.startup()
 
@@ -86,14 +84,9 @@
 
     // test if the leader is the preferred replica
     assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
-    }catch {
-      case e => error("Error while running leader election test ", e)
-    } finally {
-      // shutdown the servers and delete data hosted on them
-      servers.map(server => server.shutdown())
-      servers.map(server => Utils.rm(server.config.logDir))
-
-    }
+    // shutdown the servers and delete data hosted on them
+    servers.map(server => server.shutdown())
+    servers.map(server => Utils.rm(server.config.logDir))
   }
 
   // Assuming leader election happens correctly, test if epoch changes as expected
@@ -105,27 +98,23 @@
     // setup 2 brokers in ZK
     val brokers = TestUtils.createBrokersInZk(zkClient, List(brokerId1, brokerId2))
 
-    try {
-      // create topic with 1 partition, 2 replicas, one on each broker
-      CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+    // create topic with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
 
-      var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
-      assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
-      assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get._1)
+    var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+    assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
+    assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get._1)
 
-      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
-      newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
-      assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
-      assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get._1)
+    ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+    newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
+    assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
+    assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get._1)
 
-      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
-      newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
-      assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
-      assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get._1)
+    ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+    newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+    assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
+    assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get._1)
 
-    }finally {
-      TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
-    }
-
+    TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
   }
 }
\ No newline at end of file
Index: core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala	(revision 0)
@@ -0,0 +1,297 @@
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import org.junit.Assert._
+import kafka.admin.CreateTopicCommand
+import kafka.utils.TestUtils._
+import kafka.utils.{Utils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import kafka.message.Message
+import java.io.RandomAccessFile
+import kafka.producer.{ProducerConfig, ProducerData, Producer}
+import org.junit.Test
+
+class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
+
+  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+    override val keepInSyncTimeMs = 5000L
+    override val keepInSyncBytes = 10L
+    override val flushInterval = 10
+    override val replicaMinBytes = 20
+  })
+  val topic = "new-topic"
+  val partitionId = 0
+
+  val brokerId1 = 0
+  val brokerId2 = 1
+
+  val port1 = TestUtils.choosePort()
+  val port2 = TestUtils.choosePort()
+
+  var server1: KafkaServer = null
+  var server2: KafkaServer = null
+
+  val configProps1 = configs.head
+  val configProps2 = configs.last
+
+  val server1HWFile = configProps1.logDir + "/" + topic + "-0/highwatermark"
+  val server2HWFile = configProps2.logDir + "/" + topic + "-0/highwatermark"
+
+  //  val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
+  val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
+  val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes()))
+
+  var producer: Producer[Int, Message] = null
+  var hwFile1: RandomAccessFile = null
+  var hwFile2: RandomAccessFile = null
+  var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+
+  @Test
+  def testEmptyTest() {
+    assert(1 == 1)
+  }
+
+  @Test
+  def testHWCheckpointNoFailuresSingleLogSegment {
+    // start both servers
+    println("\n\nStarting both servers")
+    server1 = TestUtils.createServer(configProps1)
+    server2 = TestUtils.createServer(configProps2)
+    servers ++= List(server1, server2)
+
+    val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+    producerProps.put("producer.request.required.acks", "-1")
+    println("\n\nCreating producer")
+    producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+
+    // create topic with 1 partition, 2 replicas, one on each broker
+    println("\n\nCreating new topic")
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+
+    // wait until leader is elected
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertTrue("Leader should get elected", leader.isDefined)
+    // NOTE: this is to avoid transient test failures
+    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+
+    println("\n\nSending messages")
+    sendMessages()
+
+    hwFile1 = new RandomAccessFile(server1HWFile, "r")
+    hwFile2 = new RandomAccessFile(server2HWFile, "r")
+
+    println("\n\nSending messages")
+    sendMessages()
+    // don't wait for follower to read the leader's hw
+    // shutdown the servers to allow the hw to be checkpointed
+    println("\n\nShutting down servers")
+    servers.map(server => server.shutdown())
+    println("\n\nClosing producer")
+    producer.close()
+    val leaderHW = readHW(hwFile1)
+    assertEquals(60L, leaderHW)
+    val followerHW = readHW(hwFile2)
+    assertEquals(30L, followerHW)
+    hwFile1.close()
+    hwFile2.close()
+    servers.map(server => Utils.rm(server.config.logDir))
+  }
+
+  def testHWCheckpointWithFailuresSingleLogSegment {
+    // start both servers
+    server1 = TestUtils.createServer(configProps1)
+    server2 = TestUtils.createServer(configProps2)
+    servers ++= List(server1, server2)
+
+    val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+    producerProps.put("producer.request.required.acks", "-1")
+    producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+
+    // create topic with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+
+    // wait until leader is elected
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertTrue("Leader should get elected", leader.isDefined)
+    // NOTE: this is to avoid transient test failures
+    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+
+    hwFile1 = new RandomAccessFile(server1HWFile, "r")
+    hwFile2 = new RandomAccessFile(server2HWFile, "r")
+
+    assertEquals(0L, readHW(hwFile1))
+
+    sendMessages()
+
+    // kill the server hosting the preferred replica
+    server1.shutdown()
+    assertEquals(30L, readHW(hwFile1))
+
+    // check if leader moves to the other server
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
+
+    // bring the preferred replica back
+    server1.startup()
+
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
+
+    assertEquals(30L, readHW(hwFile1))
+    // since server 2 was never shut down, the hw value of 30 is probably not checkpointed to disk yet
+    server2.shutdown()
+    assertEquals(30L, readHW(hwFile2))
+
+    server2.startup()
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertEquals("Leader must remain on broker 0", 0, leader.getOrElse(-1))
+
+    sendMessages()
+    // give some time for follower 1 to record leader HW of 60
+    Thread.sleep(500)
+    // shutdown the servers to allow the hw to be checkpointed
+    servers.map(server => server.shutdown())
+    Thread.sleep(200)
+    producer.close()
+    assert(hwFile1.length() > 0)
+    assert(hwFile2.length() > 0)
+    assertEquals(60L, readHW(hwFile1))
+    assertEquals(60L, readHW(hwFile2))
+    hwFile1.close()
+    hwFile2.close()
+    servers.map(server => Utils.rm(server.config.logDir))
+  }
+
+  def testHWCheckpointNoFailuresMultipleLogSegments {
+    val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+      override val keepInSyncTimeMs = 5000L
+      override val keepInSyncBytes = 10L
+      override val flushInterval = 10
+      override val replicaMinBytes = 20
+      override val logFileSize = 30
+    })
+
+    val server1HWFile = configs.head.logDir + "/" + topic + "-0/highwatermark"
+    val server2HWFile = configs.last.logDir + "/" + topic + "-0/highwatermark"
+
+    // start both servers
+    server1 = TestUtils.createServer(configs.head)
+    server2 = TestUtils.createServer(configs.last)
+    servers ++= List(server1, server2)
+
+    val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+    producerProps.put("producer.request.required.acks", "-1")
+    producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+
+    // create topic with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+
+    // wait until leader is elected
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertTrue("Leader should get elected", leader.isDefined)
+    // NOTE: this is to avoid transient test failures
+    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+
+    sendMessages(10)
+
+    hwFile1 = new RandomAccessFile(server1HWFile, "r")
+    hwFile2 = new RandomAccessFile(server2HWFile, "r")
+
+    sendMessages(10)
+
+    // give some time for follower 1 to record leader HW of 600
+    Thread.sleep(500)
+    // shutdown the servers to allow the hw to be checkpointed
+    servers.map(server => server.shutdown())
+    producer.close()
+    val leaderHW = readHW(hwFile1)
+    assertEquals(600L, leaderHW)
+    val followerHW = readHW(hwFile2)
+    assertEquals(600L, followerHW)
+    hwFile1.close()
+    hwFile2.close()
+    servers.map(server => Utils.rm(server.config.logDir))
+  }
+
+  def testHWCheckpointWithFailuresMultipleLogSegments {
+    val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+      override val keepInSyncTimeMs = 5000L
+      override val keepInSyncBytes = 10L
+      override val flushInterval = 1000
+      override val flushSchedulerThreadRate = 10
+      override val replicaMinBytes = 20
+      override val logFileSize = 30
+    })
+
+    val server1HWFile = configs.head.logDir + "/" + topic + "-0/highwatermark"
+    val server2HWFile = configs.last.logDir + "/" + topic + "-0/highwatermark"
+
+    // start both servers
+    server1 = TestUtils.createServer(configs.head)
+    server2 = TestUtils.createServer(configs.last)
+    servers ++= List(server1, server2)
+
+    val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+    producerProps.put("producer.request.required.acks", "-1")
+    producer = new Producer[Int, Message](new ProducerConfig(producerProps))
+
+    // create topic with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
+
+    // wait until leader is elected
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertTrue("Leader should get elected", leader.isDefined)
+    // NOTE: this is to avoid transient test failures
+    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+
+    val hwFile1 = new RandomAccessFile(server1HWFile, "r")
+    val hwFile2 = new RandomAccessFile(server2HWFile, "r")
+
+    sendMessages(2)
+    // allow some time for the follower to get the leader HW
+    Thread.sleep(1000)
+    // kill the server hosting the preferred replica
+    server1.shutdown()
+    server2.shutdown()
+    assertEquals(60L, readHW(hwFile1))
+    assertEquals(60L, readHW(hwFile2))
+
+    server2.startup()
+    // check if leader moves to the other server
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
+
+    assertEquals(60L, readHW(hwFile1))
+
+    // bring the preferred replica back
+    server1.startup()
+
+    assertEquals(60L, readHW(hwFile1))
+    assertEquals(60L, readHW(hwFile2))
+
+    sendMessages(2)
+    // allow some time for the follower to get the leader HW
+    Thread.sleep(1000)
+    // shutdown the servers to allow the hw to be checkpointed
+    servers.map(server => server.shutdown())
+    producer.close()
+    assert(hwFile1.length() > 0)
+    assert(hwFile2.length() > 0)
+    assertEquals(120L, readHW(hwFile1))
+    assertEquals(120L, readHW(hwFile2))
+    hwFile1.close()
+    hwFile2.close()
+    servers.map(server => Utils.rm(server.config.logDir))
+  }
+
+  private def sendMessages(numMessages: Int = 1) {
+    for(i <- 0 until numMessages) {
+      producer.send(new ProducerData[Int, Message](topic, 0, sent1))
+    }
+  }
+
+  private def readHW(hwFile: RandomAccessFile): Long = {
+    hwFile.seek(0)
+    hwFile.readLong()
+  }
+}
\ No newline at end of file
Index: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala	(revision 1362267)
+++ core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala	(working copy)
@@ -18,9 +18,7 @@
 
 import java.io.File
 import kafka.consumer.SimpleConsumer
-import java.util.Properties
 import org.junit.Test
-import org.scalatest.junit.JUnitSuite
 import junit.framework.Assert._
 import kafka.message.{Message, ByteBufferMessageSet}
 import org.scalatest.junit.JUnit3Suite
@@ -51,7 +49,7 @@
       // create topic
       CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
 
-      val producer = new Producer[Int, Message](getProducerConfig(64*1024, 100000, 10000))
+      val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(zkConnect, 64*1024, 100000, 10000)))
 
       // send some messages
       producer.send(new ProducerData[Int, Message](topic, 0, sent1))
@@ -66,7 +64,7 @@
 
 
     {
-      val producer = new Producer[Int, Message](getProducerConfig(64*1024, 100000, 10000))
+      val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(zkConnect, 64*1024, 100000, 10000)))
       val consumer = new SimpleConsumer(host,
                                         port,
                                         1000000,
@@ -103,15 +101,4 @@
     }
 
   }
-
-  private def getProducerConfig(bufferSize: Int, connectTimeout: Int,
-                                reconnectInterval: Int): ProducerConfig = {
-    val props = new Properties()
-    props.put("zk.connect", zkConnect)
-    props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
-    props.put("buffer.size", bufferSize.toString)
-    props.put("connect.timeout.ms", connectTimeout.toString)
-    props.put("reconnect.interval", reconnectInterval.toString)
-    new ProducerConfig(props)
-  }
 }
Index: core/src/test/scala/unit/kafka/admin/AdminTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/admin/AdminTest.scala	(revision 1362267)
+++ core/src/test/scala/unit/kafka/admin/AdminTest.scala	(working copy)
@@ -20,6 +20,7 @@
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
+import kafka.common.ErrorMapping
 import kafka.utils.TestUtils
 
 class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -137,13 +138,28 @@
       10 -> List("1", "2", "3"),
       11 -> List("1", "3", "4")
     )
+    val leaderForPartitionMap = Map(
+      0 -> 0,
+      1 -> 1,
+      2 -> 2,
+      3 -> 3,
+      4 -> 4,
+      5 -> 0,
+      6 -> 1,
+      7 -> 2,
+      8 -> 3,
+      9 -> 4,
+      10 -> 1,
+      11 -> 1
+    )
+    val topic = "test"
     TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
-
-    val topic = "test"
     // create the topic
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    // create leaders for all partitions
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
     val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
-                                  .get.partitionsMetadata.map(p => p.replicas)
+                                  .partitionsMetadata.map(p => p.replicas)
     val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
     assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
     for( i <- 0 until actualReplicaList.size ) {
@@ -165,23 +181,30 @@
       0 -> List("0", "1", "2"),
       1 -> List("1", "2", "3")
     )
+    val leaderForPartitionMap = Map(
+      0 -> 0,
+      1 -> 1
+    )
     val topic = "auto-topic"
     TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
     AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    // create leaders for all partitions
+    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
 
     val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
-    newTopicMetadata match {
-      case Some(metadata) =>
-        assertEquals(topic, metadata.topic)
-        assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)
-        assertEquals("partition metadata list length should be 2", 2, metadata.partitionsMetadata.size)
-        val actualReplicaAssignment = metadata.partitionsMetadata.map(p => p.replicas)
+    newTopicMetadata.errorCode match {
+      case ErrorMapping.UnknownTopicCode =>
+        fail("Topic " + topic + " should've been automatically created")
+      case _ =>
+        assertEquals(topic, newTopicMetadata.topic)
+        assertNotNull("partition metadata list cannot be null", newTopicMetadata.partitionsMetadata)
+        assertEquals("partition metadata list length should be 2", 2, newTopicMetadata.partitionsMetadata.size)
+        val actualReplicaAssignment = newTopicMetadata.partitionsMetadata.map(p => p.replicas)
         val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
         assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
         for(i <- 0 until actualReplicaList.size) {
           assertEquals(expectedReplicaAssignment(i), actualReplicaList(i))
         }
-      case None => fail("Topic " + topic + " should've been automatically created")
     }
   }
 }
Index: core/src/main/scala/kafka/log/LogStats.scala
===================================================================
--- core/src/main/scala/kafka/log/LogStats.scala	(revision 1362267)
+++ core/src/main/scala/kafka/log/LogStats.scala	(working copy)
@@ -36,7 +36,7 @@
   
   def getNumberOfSegments: Int = log.numberOfSegments
   
-  def getCurrentOffset: Long = log.highwaterMark
+  def getCurrentOffset: Long = log.getHW()
   
   def getNumAppendedMessages: Long = numCumulatedMessages.get
 
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1362267)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -94,8 +94,19 @@
  */
 class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long) extends Range {
   @volatile var deleted = false
-  def size: Long = messageSet.highWaterMark
+  def size: Long = messageSet.sizeInBytes()
   override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")"
+
+  /**
+   * Truncate this log segment upto absolute offset value. Since the offset specified is absolute, to compute the amount
+   * of data to be deleted, we have to compute the offset relative to start of the log segment
+   * @param offset Absolute offset for this partition
+   */
+  def truncateUpto(offset: Long) = {
+    assert(offset >= start, "Offset %d used for truncating this log segment cannot be smaller than the start offset %d".
+                            format(offset, start))
+    messageSet.truncateUpto(offset - start)
+  }
 }
 
 
@@ -120,11 +131,14 @@
   /* The actual segments of the log */
   private[log] val segments: SegmentList[LogSegment] = loadSegments()
 
-
   /* create the leader highwatermark file handle */
   private val hwFile = new RandomAccessFile(dir.getAbsolutePath + "/" + hwFileName, "rw")
+  info("Created highwatermark file %s for log %s".format(dir.getAbsolutePath + "/" + hwFileName, name))
 
-  private var hw: Long = 0
+  /**
+   * If hw file is absent, the hw defaults to 0. If it exists, hw is set to the checkpointed value
+   */
+  private var hw: Long = if(hwFile.length() > 0) hwFile.readLong() else { hwFile.writeLong(0); 0 }
 
   private val logStats = new LogStats(this)
 
@@ -259,6 +273,7 @@
    * Read from the log file at the given offset
    */
   def read(offset: Long, length: Int): MessageSet = {
+    trace("Reading %d bytes from offset %d in log %s of length %s bytes".format(length, offset, name, size))
     val view = segments.view
     Log.findRange(view, offset, view.length) match {
       case Some(segment) => segment.messageSet.read((offset - segment.start), length)
@@ -306,16 +321,11 @@
   }
 
   /**
-   *  get the current high watermark of the log
+   *  get the absolute offset of the last message in the log
    */
-  def highwaterMark: Long = segments.view.last.messageSet.highWaterMark
+  def logEndOffset: Long = segments.view.last.start + segments.view.last.messageSet.getEndOffset()
 
   /**
-   *  get the offset of the last message in the log
-   */
-  def logEndOffset: Long = segments.view.last.messageSet.getEndOffset()
-
-  /**
    * Roll the log over if necessary
    */
   private def maybeRoll(segment: LogSegment) {
@@ -376,7 +386,7 @@
     for (i <- 0 until segsArray.length)
       offsetTimeArray(i) = (segsArray(i).start, segsArray(i).file.lastModified)
     if (segsArray.last.size > 0)
-      offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
+      offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.sizeInBytes(), SystemTime.milliseconds)
 
     var startIndex = -1
     request.time match {
@@ -438,35 +448,31 @@
       // read the last checkpointed hw from disk
       hwFile.seek(0)
       val lastKnownHW = hwFile.readLong()
+      info("Recovering log %s upto highwatermark %d".format(name, lastKnownHW))
+      segments.view.foreach { segment =>
+        debug("Segment start offset = %d. Segment end offset = %d".format(segment.start, segment.messageSet.getEndOffset()))
+      }
+
       // find the log segment that has this hw
       val segmentToBeTruncated = segments.view.find(segment =>
         lastKnownHW >= segment.start && lastKnownHW < segment.messageSet.getEndOffset())
 
       segmentToBeTruncated match {
         case Some(segment) =>
-          val truncatedSegmentIndex = segments.view.indexOf(segment)
-          segments.truncLast(truncatedSegmentIndex)
-        case None =>
-      }
-
-      segmentToBeTruncated match {
-        case Some(segment) =>
-          segment.messageSet.truncateUpto(lastKnownHW)
+          segment.truncateUpto(lastKnownHW)
           info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, hw))
         case None =>
-          assert(lastKnownHW <= segments.view.last.messageSet.size,
+          assert(lastKnownHW <= logEndOffset,
             "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".
-              format(lastKnownHW, segments.view.last.messageSet.size, segments.view.last.file.getAbsolutePath))
+              format(lastKnownHW, segments.view.last.messageSet.getEndOffset(), segments.view.last.file.getAbsolutePath))
           error("Cannot truncate log to %d since the log start offset is %d and end offset is %d"
-            .format(lastKnownHW, segments.view.head.start, segments.view.last.messageSet.size))
+            .format(lastKnownHW, segments.view.head.start, logEndOffset))
       }
 
-      val segmentsToBeDeleted = segments.view.filter(segment => segment.start >= lastKnownHW)
-      if(segmentsToBeDeleted.size < segments.view.size) {
+      val segmentsToBeDeleted = segments.view.filter(segment => segment.start > lastKnownHW)
       val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
       if(numSegmentsDeleted != segmentsToBeDeleted.size)
         error("Failed to delete some segments during log recovery")
-      }
     }else
       info("Unable to recover log upto hw. No previously checkpointed high watermark found for " + name)
   }
@@ -475,10 +481,13 @@
     hw = latestLeaderHW
   }
 
+  def getHW(): Long = hw
+
   def checkpointHW() {
     hwFile.seek(0)
     hwFile.writeLong(hw)
     hwFile.getChannel.force(true)
+    info("Checkpointed highwatermark %d for log %s".format(hw, name))
   }
 
   def topicName():String = {
Index: core/src/main/scala/kafka/cluster/Replica.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Replica.scala	(revision 1362267)
+++ core/src/main/scala/kafka/cluster/Replica.scala	(working copy)
@@ -78,7 +78,7 @@
       case None =>
         isLocal match {
           case true =>
-            log.get.highwaterMark
+            log.get.getHW()
           case false => throw new IllegalStateException("Unable to get highwatermark for topic %s ".format(topic) +
             "partition %d on broker %d, since there is no local log for this partition"
               .format(partition.partitionId, brokerId))
Index: core/src/main/scala/kafka/cluster/Partition.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Partition.scala	(revision 1362267)
+++ core/src/main/scala/kafka/cluster/Partition.scala	(working copy)
@@ -16,12 +16,12 @@
  */
 package kafka.cluster
 
-import kafka.common.NoLeaderForPartitionException
 import kafka.utils.{SystemTime, Time, Logging}
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.ZkUtils._
 import java.util.concurrent.locks.ReentrantLock
 import java.lang.IllegalStateException
+import kafka.common.LeaderNotAvailableException
 
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
@@ -74,13 +74,14 @@
 
   def leaderReplica(): Replica = {
     val leaderReplicaId = leaderId()
+    trace("Returning leader replica for leader " + leaderReplicaId)
     if(leaderReplicaId.isDefined) {
       val leaderReplica = assignedReplicas().find(_.brokerId == leaderReplicaId.get)
       if(leaderReplica.isDefined) leaderReplica.get
       else throw new IllegalStateException("No replica for leader %d in the replica manager"
         .format(leaderReplicaId.get))
     }else
-      throw new NoLeaderForPartitionException("Leader for topic %s partition %d does not exist"
+      throw new LeaderNotAvailableException("Leader for topic %s partition %d does not exist"
         .format(topic, partitionId))
   }
 
@@ -134,7 +135,7 @@
           case None => throw new IllegalStateException("ISR update failed. No replica for id %d".format(r))
         }
       }
-      info("Updated ISR for for topic %s partition %d to %s in cache".format(topic, partitionId, newISR.mkString(",")))
+      info("Updated ISR for topic %s partition %d to %s in cache".format(topic, partitionId, newISR.mkString(",")))
     }catch {
       case e => throw new IllegalStateException("Failed to update ISR for topic %s ".format(topic) +
         "partition %d to %s".format(partitionId, newISR.mkString(",")), e)
@@ -146,7 +147,7 @@
   private def updateISRInZk(newISR: Set[Int], zkClient: ZkClient) = {
     val replicaListAndEpochString = readDataMaybeNull(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString))
     if(replicaListAndEpochString == null) {
-      throw new NoLeaderForPartitionException(("Illegal partition state. ISR cannot be updated for topic " +
+      throw new LeaderNotAvailableException(("Illegal partition state. ISR cannot be updated for topic " +
         "%s partition %d since leader and ISR does not exist in ZK".format(topic, partitionId)))
     }
     else {
@@ -154,7 +155,7 @@
       val epoch = replicasAndEpochInfo.last
       updatePersistentPath(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString),
         "%s;%s".format(newISR.mkString(","), epoch))
-      info("Updating ISR for for topic %s partition %d to %s in ZK".format(topic, partitionId, newISR.mkString(",")))
+      info("Updated ISR for topic %s partition %d to %s in ZK".format(topic, partitionId, newISR.mkString(",")))
     }
   }
 
Index: core/src/main/scala/kafka/producer/ProducerPool.scala
===================================================================
--- core/src/main/scala/kafka/producer/ProducerPool.scala	(revision 1362267)
+++ core/src/main/scala/kafka/producer/ProducerPool.scala	(working copy)
@@ -20,7 +20,7 @@
 import kafka.cluster.Broker
 import java.util.Properties
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZkUtils, Utils, Logging}
+import kafka.utils.{ZkUtils, Logging}
 import collection.mutable.HashMap
 import java.lang.Object
 import kafka.common.{UnavailableProducerException, NoBrokersForPartitionException}
Index: core/src/main/scala/kafka/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducer.scala	(revision 1362267)
+++ core/src/main/scala/kafka/producer/SyncProducer.scala	(working copy)
@@ -22,7 +22,7 @@
 import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
 import kafka.utils._
 import java.util.Random
-import kafka.common.MessageSizeTooLargeException
+import kafka.common.{ErrorMapping, MessageSizeTooLargeException}
 
 object SyncProducer {
   val RequestKey: Short = 0
@@ -114,6 +114,8 @@
   def send(request: TopicMetadataRequest): Seq[TopicMetadata] = {
     val response = doSend(request)
     val topicMetaDataResponse = TopicMetaDataResponse.readFrom(response.buffer)
+    // try to throw exception based on global error codes
+    ErrorMapping.maybeThrowException(topicMetaDataResponse.errorCode)
     topicMetaDataResponse.topicsMetadata
   }
 
Index: core/src/main/scala/kafka/producer/SyncProducerConfig.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducerConfig.scala	(revision 1362267)
+++ core/src/main/scala/kafka/producer/SyncProducerConfig.scala	(working copy)
@@ -36,7 +36,7 @@
   val connectTimeoutMs = Utils.getInt(props, "connect.timeout.ms", 5000)
 
   /** the socket timeout for network requests */
-  val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", 30000)  
+  val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", 3000)
 
   val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000)
 
Index: core/src/main/scala/kafka/producer/ProducerConfig.scala
===================================================================
--- core/src/main/scala/kafka/producer/ProducerConfig.scala	(revision 1362267)
+++ core/src/main/scala/kafka/producer/ProducerConfig.scala	(working copy)
@@ -87,5 +87,5 @@
    */
   val producerRetries = Utils.getInt(props, "producer.num.retries", 3)
 
-  val producerRetryBackoffMs = Utils.getInt(props, "producer.retry.backoff.ms", 5)
+  val producerRetryBackoffMs = Utils.getInt(props, "producer.retry.backoff.ms", 100)
 }
Index: core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
===================================================================
--- core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala	(revision 1362267)
+++ core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala	(working copy)
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package kafka.producer
 
 import collection.mutable.HashMap
@@ -21,6 +21,7 @@
 import java.lang.IllegalStateException
 import kafka.utils.Logging
 import kafka.cluster.{Replica, Partition}
+import kafka.common.{LeaderNotAvailableException, ErrorMapping, UnknownTopicException}
 
 class BrokerPartitionInfo(producerPool: ProducerPool) extends Logging {
   val topicPartitionInfo = new HashMap[String, TopicMetadata]()
@@ -31,7 +32,7 @@
    * @param topic the topic for which this information is to be returned
    * @return a sequence of (brokerId, numPartitions). Returns a zero-length
    * sequence if no brokers are available.
-   */  
+   */
   def getBrokerPartitionInfo(topic: String): Seq[Partition] = {
     debug("Getting broker partition info for topic %s".format(topic))
     // check if the cache has metadata for this topic
@@ -41,8 +42,7 @@
         case Some(m) => m
         case None =>
           // refresh the topic metadata cache
-          info("Fetching metadata for topic %s".format(topic))
-          updateInfo(topic)
+          updateInfo(List(topic))
           val topicMetadata = topicPartitionInfo.get(topic)
           topicMetadata match {
             case Some(m) => m
@@ -69,24 +69,33 @@
    * It updates the cache by issuing a get topic metadata request to a random broker.
    * @param topic the topic for which the metadata is to be fetched
    */
-  def updateInfo(topic: String = null) = {
+  def updateInfo(topics: Seq[String] = Seq.empty[String]) = {
     val producer = producerPool.getAnyProducer
-    if(topic != null) {
+    val topicList = if(topics.size > 0) topics else topicPartitionInfo.keySet.toList
+    topicList.foreach { topic =>
+      info("Fetching metadata for topic %s".format(topic))
       val topicMetadataRequest = new TopicMetadataRequest(List(topic))
-      val topicMetadataList = producer.send(topicMetadataRequest)
-      val topicMetadata:Option[TopicMetadata] = if(topicMetadataList.size > 0) Some(topicMetadataList.head) else None
+      var topicMetaDataResponse: Seq[TopicMetadata] = Nil
+      try {
+        topicMetaDataResponse = producer.send(topicMetadataRequest)
+        // throw topic specific exception
+        topicMetaDataResponse.foreach(metadata => ErrorMapping.maybeThrowException(metadata.errorCode))
+        // throw partition specific exception
+        topicMetaDataResponse.foreach(metadata =>
+          metadata.partitionsMetadata.foreach(partitionMetadata => ErrorMapping.maybeThrowException(partitionMetadata.errorCode)))
+      }catch {
+        case te: UnknownTopicException => throw te
+        case e: LeaderNotAvailableException => throw e
+        case oe => warn("Ignoring non leader related error while fetching metadata", oe)  // swallow non leader related errors
+      }
+      val topicMetadata:Option[TopicMetadata] = if(topicMetaDataResponse.size > 0) Some(topicMetaDataResponse.head) else None
       topicMetadata match {
         case Some(metadata) =>
           info("Fetched metadata for topics %s".format(topic))
+          topicMetadata.foreach(metadata => trace("Metadata for topic %s is %s".format(metadata.topic, metadata.toString)))
           topicPartitionInfo += (topic -> metadata)
         case None =>
       }
-    }else {
-      // refresh cache for all topics
-      val topics = topicPartitionInfo.keySet.toList
-      val topicMetadata = producer.send(new TopicMetadataRequest(topics))
-      topicMetadata.foreach(metadata => topicPartitionInfo += (metadata.topic -> metadata))
-
     }
   }
 }
Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(revision 1362267)
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(working copy)
@@ -50,7 +50,8 @@
         if (outstandingProduceRequests.size > 0)  {
           // back off and update the topic metadata cache before attempting another send operation
           Thread.sleep(config.producerRetryBackoffMs)
-          Utils.swallowError(brokerPartitionInfo.updateInfo())
+          // get topics of the outstanding produce requests and refresh metadata for those
+          Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic)))
           remainingRetries -= 1
         }
       }
@@ -62,66 +63,78 @@
   }
 
   private def dispatchSerializedData(messages: Seq[ProducerData[K,Message]]): Seq[ProducerData[K, Message]] = {
-    val partitionedData = partitionAndCollate(messages)
-    val failedProduceRequests = new ListBuffer[ProducerData[K,Message]]
-    try {
-      for ((brokerid, eventsPerBrokerMap) <- partitionedData) {
-        if (logger.isTraceEnabled)
-          eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s"
-            .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
-        val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
+    val partitionedDataOpt = partitionAndCollate(messages)
+    partitionedDataOpt match {
+      case Some(partitionedData) =>
+        val failedProduceRequests = new ListBuffer[ProducerData[K,Message]]
+        try {
+          for ((brokerid, eventsPerBrokerMap) <- partitionedData) {
+            if (logger.isTraceEnabled)
+              eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s"
+                .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
+            val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
 
-        val failedTopicPartitions = send(brokerid, messageSetPerBroker)
-        for( (topic, partition) <- failedTopicPartitions ) {
-          eventsPerBrokerMap.get((topic, partition)) match {
-            case Some(data) => failedProduceRequests.appendAll(data)
-            case None => // nothing
+            val failedTopicPartitions = send(brokerid, messageSetPerBroker)
+            for( (topic, partition) <- failedTopicPartitions ) {
+              eventsPerBrokerMap.get((topic, partition)) match {
+                case Some(data) => failedProduceRequests.appendAll(data)
+                case None => // nothing
+              }
+            }
           }
+        } catch {
+          case t: Throwable => error("Failed to send messages")
         }
-      }
-    } catch {
-      case t: Throwable => error("Failed to send messages")
+        failedProduceRequests
+      case None => // all produce requests failed
+        messages
     }
-    failedProduceRequests
   }
 
   def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = {
     events.map(e => new ProducerData[K,Message](e.getTopic, e.getKey, e.getData.map(m => encoder.toMessage(m))))
   }
 
-  def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]] = {
+  def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]] = {
     val ret = new HashMap[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]
-    for (event <- events) {
-      val topicPartitionsList = getPartitionListForTopic(event)
-      val totalNumPartitions = topicPartitionsList.length
+    try {
+      for (event <- events) {
+        val topicPartitionsList = getPartitionListForTopic(event)
+        val totalNumPartitions = topicPartitionsList.length
 
-      val partitionIndex = getPartition(event.getKey, totalNumPartitions)
-      val brokerPartition = topicPartitionsList(partitionIndex)
+        val partitionIndex = getPartition(event.getKey, totalNumPartitions)
+        val brokerPartition = topicPartitionsList(partitionIndex)
 
-      // postpone the failure until the send operation, so that requests for other brokers are handled correctly
-      val leaderBrokerId = brokerPartition.leaderId().getOrElse(-1)
+        // postpone the failure until the send operation, so that requests for other brokers are handled correctly
+        val leaderBrokerId = brokerPartition.leaderId().getOrElse(-1)
 
-      var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
-      ret.get(leaderBrokerId) match {
-        case Some(element) =>
-          dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]]
-        case None =>
-          dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]]
-          ret.put(leaderBrokerId, dataPerBroker)
-      }
+        var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
+        ret.get(leaderBrokerId) match {
+          case Some(element) =>
+            dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]]
+          case None =>
+            dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]]
+            ret.put(leaderBrokerId, dataPerBroker)
+        }
 
-      val topicAndPartition = (event.getTopic, brokerPartition.partitionId)
-      var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
-      dataPerBroker.get(topicAndPartition) match {
-        case Some(element) =>
-          dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]]
-        case None =>
-          dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]]
-          dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
+        val topicAndPartition = (event.getTopic, brokerPartition.partitionId)
+        var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
+        dataPerBroker.get(topicAndPartition) match {
+          case Some(element) =>
+            dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]]
+          case None =>
+            dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]]
+            dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
+        }
+        dataPerTopicPartition.append(event)
       }
-      dataPerTopicPartition.append(event)
+      Some(ret)
+    }catch {
+      case e: InvalidPartitionException => throw e
+      case nbe: NoBrokersForPartitionException => throw nbe
+      case oe => error("Failed to collate messages by topic, partition", oe)
+                 None
     }
-    ret
   }
 
   private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[Partition] = {
@@ -144,12 +157,12 @@
   private def getPartition(key: K, numPartitions: Int): Int = {
     if(numPartitions <= 0)
       throw new InvalidPartitionException("Invalid number of partitions: " + numPartitions +
-              "\n Valid values are > 0")
+        "\n Valid values are > 0")
     val partition = if(key == null) Utils.getNextRandomInt(numPartitions)
-                    else partitioner.partition(key, numPartitions)
+    else partitioner.partition(key, numPartitions)
     if(partition < 0 || partition >= numPartitions)
       throw new InvalidPartitionException("Invalid partition id : " + partition +
-              "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
+        "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
     partition
   }
 
@@ -204,46 +217,46 @@
      */
 
     val messagesPerTopicPartition = eventsPerTopicAndPartition.map { e =>
-      {
-        val topicAndPartition = e._1
-        val produceData = e._2
-        val messages = new ListBuffer[Message]
-        produceData.map(p => messages.appendAll(p.getData))
+    {
+      val topicAndPartition = e._1
+      val produceData = e._2
+      val messages = new ListBuffer[Message]
+      produceData.map(p => messages.appendAll(p.getData))
 
-        ( topicAndPartition,
-          config.compressionCodec match {
-            case NoCompressionCodec =>
-              trace("Sending %d messages with no compression to topic %s on partition %d"
-                  .format(messages.size, topicAndPartition._1, topicAndPartition._2))
-              new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
-            case _ =>
-              config.compressedTopics.size match {
-                case 0 =>
+      ( topicAndPartition,
+        config.compressionCodec match {
+          case NoCompressionCodec =>
+            trace("Sending %d messages with no compression to topic %s on partition %d"
+              .format(messages.size, topicAndPartition._1, topicAndPartition._2))
+            new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+          case _ =>
+            config.compressedTopics.size match {
+              case 0 =>
+                trace("Sending %d messages with compression codec %d to topic %s on partition %d"
+                  .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
+                new ByteBufferMessageSet(config.compressionCodec, messages: _*)
+              case _ =>
+                if(config.compressedTopics.contains(topicAndPartition._1)) {
                   trace("Sending %d messages with compression codec %d to topic %s on partition %d"
-                      .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
+                    .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
                   new ByteBufferMessageSet(config.compressionCodec, messages: _*)
-                case _ =>
-                  if(config.compressedTopics.contains(topicAndPartition._1)) {
-                    trace("Sending %d messages with compression codec %d to topic %s on partition %d"
-                        .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
-                    new ByteBufferMessageSet(config.compressionCodec, messages: _*)
-                  }
-                  else {
-                    trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s"
-                        .format(messages.size, topicAndPartition._1, topicAndPartition._2, topicAndPartition._1,
-                        config.compressedTopics.toString))
-                    new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
-                  }
-              }
-          }
+                }
+                else {
+                  trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s"
+                    .format(messages.size, topicAndPartition._1, topicAndPartition._2, topicAndPartition._1,
+                    config.compressedTopics.toString))
+                  new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+                }
+            }
+        }
         )
-      }
     }
+    }
     messagesPerTopicPartition
   }
 
   def close() {
     if (producerPool != null)
-      producerPool.close    
+      producerPool.close
   }
 }
Index: core/src/main/scala/kafka/message/FileMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/FileMessageSet.scala	(revision 1362267)
+++ core/src/main/scala/kafka/message/FileMessageSet.scala	(working copy)
@@ -55,13 +55,10 @@
     }
     else {
       setSize.set(channel.size())
-      setHighWaterMark.set(sizeInBytes)
       channel.position(channel.size)
     }
   } else {
     setSize.set(scala.math.min(channel.size(), limit) - offset)
-    setHighWaterMark.set(sizeInBytes)
-    debug("initializing high water mark in immutable mode: " + highWaterMark)
   }
   
   /**
@@ -93,7 +90,7 @@
    * Return a message set which is a view into this set starting from the given offset and with the given size limit.
    */
   def read(readOffset: Long, size: Long): FileMessageSet = {
-    new FileMessageSet(channel, this.offset + readOffset, scala.math.min(this.offset + readOffset + size, highWaterMark),
+    new FileMessageSet(channel, this.offset + readOffset, scala.math.min(this.offset + readOffset + size, sizeInBytes()),
       false, new AtomicBoolean(false))
   }
   
@@ -140,11 +137,6 @@
    * The number of bytes taken up by this file set
    */
   def sizeInBytes(): Long = setSize.get()
-  
-  /**
-    * The high water mark
-    */
-  def highWaterMark(): Long = setHighWaterMark.get()
 
   def getEndOffset(): Long = offset + sizeInBytes()
 
@@ -174,8 +166,7 @@
     val elapsedTime = SystemTime.milliseconds - startTime
     LogFlushStats.recordFlushRequest(elapsedTime)
     debug("flush time " + elapsedTime)
-    setHighWaterMark.set(sizeInBytes)
-    debug("flush high water mark:" + highWaterMark)
+    debug("flush size:" + sizeInBytes())
   }
   
   /**
@@ -203,8 +194,7 @@
     } while(next >= 0)
     channel.truncate(validUpTo)
     setSize.set(validUpTo)
-    setHighWaterMark.set(validUpTo)
-    info("recover high water mark:" + highWaterMark)
+    info("recover upto size:" + sizeInBytes())
     /* This should not be necessary, but fixes bug 6191269 on some OSs. */
     channel.position(validUpTo)
     needRecover.set(false)    
@@ -214,7 +204,6 @@
   def truncateUpto(hw: Long) = {
     channel.truncate(hw)
     setSize.set(hw)
-    setHighWaterMark.set(hw)
   }
 
   /**
Index: core/src/main/scala/kafka/common/LeaderNotAvailableException.scala
===================================================================
--- core/src/main/scala/kafka/common/LeaderNotAvailableException.scala	(revision 0)
+++ core/src/main/scala/kafka/common/LeaderNotAvailableException.scala	(revision 0)
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Thrown when a request is made for partition, but no leader exists for that partition
+ */
+class LeaderNotAvailableException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/common/ErrorMapping.scala
===================================================================
--- core/src/main/scala/kafka/common/ErrorMapping.scala	(revision 1362267)
+++ core/src/main/scala/kafka/common/ErrorMapping.scala	(working copy)
@@ -35,10 +35,11 @@
   val InvalidPartitionCode : Short = 3
   val InvalidFetchSizeCode  : Short = 4
   val InvalidFetchRequestFormatCode : Short = 5
-  val NoLeaderForPartitionCode : Short = 6
+  val LeaderNotAvailableCode : Short = 6
   val NotLeaderForPartitionCode : Short = 7
   val UnknownTopicCode : Short = 8
   val RequestTimedOutCode: Short = 9
+  val ReplicaNotAvailableCode: Short = 10
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -48,8 +49,9 @@
       classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
       classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
       classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
-      classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode,
+      classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
       classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
+      classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
       classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode
     ).withDefaultValue(UnknownCode)
   
@@ -64,6 +66,8 @@
       throw codeToException(code).newInstance()
 
   def exceptionFor(code: Short) : Throwable = codeToException(code).newInstance()
+
+  def getMaxErrorCodeValue = exceptionToCode.map(_._2).max
 }
 
 class InvalidTopicException(message: String) extends RuntimeException(message) {
Index: core/src/main/scala/kafka/common/NoLeaderForPartitionException.scala
===================================================================
--- core/src/main/scala/kafka/common/NoLeaderForPartitionException.scala	(revision 1362267)
+++ core/src/main/scala/kafka/common/NoLeaderForPartitionException.scala	(working copy)
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-/**
- * Thrown when a request is made for partition, but no leader exists for that partition
- */
-class NoLeaderForPartitionException(message: String) extends RuntimeException(message) {
-  def this() = this(null)
-}
\ No newline at end of file
Index: core/src/main/scala/kafka/common/ReplicaNotAvailableException.scala
===================================================================
--- core/src/main/scala/kafka/common/ReplicaNotAvailableException.scala	(revision 0)
+++ core/src/main/scala/kafka/common/ReplicaNotAvailableException.scala	(revision 0)
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Thrown when a request is made for partition, but no leader exists for that partition
+ */
+class ReplicaNotAvailableException(cause: Throwable) extends RuntimeException(cause) {
+  def this() = this(null)
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/admin/AdminUtils.scala
===================================================================
--- core/src/main/scala/kafka/admin/AdminUtils.scala	(revision 1362267)
+++ core/src/main/scala/kafka/admin/AdminUtils.scala	(working copy)
@@ -24,6 +24,7 @@
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection.mutable
+import kafka.common.{LeaderNotAvailableException, ReplicaNotAvailableException, ErrorMapping}
 
 object AdminUtils extends Logging {
   val rand = new Random
@@ -48,15 +49,15 @@
    * p7        p8        p9        p5        p6       (3nd replica)
    */
   def assignReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
-          fixedStartIndex: Int = -1)  // for testing only
-    : Map[Int, List[String]] = {
+                              fixedStartIndex: Int = -1)  // for testing only
+  : Map[Int, List[String]] = {
     if (nPartitions <= 0)
       throw new AdministrationException("number of partitions must be larger than 0")
     if (replicationFactor <= 0)
       throw new AdministrationException("replication factor must be larger than 0")
     if (replicationFactor > brokerList.size)
       throw new AdministrationException("replication factor: " + replicationFactor +
-              " larger than available brokers: " + brokerList.size)
+        " larger than available brokers: " + brokerList.size)
     val ret = new mutable.HashMap[Int, List[String]]()
     val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
 
@@ -85,7 +86,7 @@
     }
   }
 
-  def getTopicMetaDataFromZK(topics: Seq[String], zkClient: ZkClient): Seq[Option[TopicMetadata]] = {
+  def getTopicMetaDataFromZK(topics: Seq[String], zkClient: ZkClient): Seq[TopicMetadata] = {
     val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
     topics.map { topic =>
       if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
@@ -99,15 +100,42 @@
           val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
           debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
 
-          new PartitionMetadata(partition,
-            leader.map(l => getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head),
-            getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)),
-            getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas),
-            None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
+          var leaderInfo: Option[Broker] = None
+          var replicaInfo: Seq[Broker] = Nil
+          var isrInfo: Seq[Broker] = Nil
+          try {
+            try {
+              leaderInfo = leader match {
+                case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
+                case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
+              }
+            }catch {
+              case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d"
+                .format(topic, partition))
+            }
+
+            try {
+              replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
+              isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
+            }catch {
+              case e => throw new ReplicaNotAvailableException(e)
+            }
+
+            new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError,
+              None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
+          }catch {
+            case e: ReplicaNotAvailableException => new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
+              ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
+              None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
+            case le: LeaderNotAvailableException => new PartitionMetadata(partition, None, replicaInfo, isrInfo,
+              ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]]),
+              None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
+          }
         }
-        Some(new TopicMetadata(topic, partitionMetadata))
+        new TopicMetadata(topic, partitionMetadata)
       } else {
-        None
+        // topic doesn't exist, send appropriate error code
+        new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicCode)
       }
     }
   }
@@ -120,9 +148,14 @@
       optionalBrokerInfo match {
         case Some(brokerInfo) => brokerInfo // return broker info from the cache
         case None => // fetch it from zookeeper
-          val brokerInfo = ZkUtils.getBrokerInfoFromIds(zkClient, List(id)).head
-          cachedBrokerInfo += (id -> brokerInfo)
-          brokerInfo
+          try {
+            val brokerInfo = ZkUtils.getBrokerInfoFromIds(zkClient, List(id)).head
+            cachedBrokerInfo += (id -> brokerInfo)
+            brokerInfo
+          }catch {
+            case e => error("Failed to fetch broker info for broker id " + id)
+            throw e
+          }
       }
     }
   }
Index: core/src/main/scala/kafka/admin/ListTopicCommand.scala
===================================================================
--- core/src/main/scala/kafka/admin/ListTopicCommand.scala	(revision 1362267)
+++ core/src/main/scala/kafka/admin/ListTopicCommand.scala	(working copy)
@@ -20,6 +20,7 @@
 import joptsimple.OptionParser
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
+import kafka.common.ErrorMapping
 
 object ListTopicCommand {
 
@@ -77,12 +78,12 @@
 
   def showTopic(topic: String, zkClient: ZkClient) {
     val topicMetaData = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
-    topicMetaData match {
-      case None =>
+    topicMetaData.errorCode match {
+      case ErrorMapping.UnknownTopicCode =>
         println("topic " + topic + " doesn't exist!")
-      case Some(tmd) =>
+      case _ =>
         println("topic: " + topic)
-        for (part <- tmd.partitionsMetadata)
+        for (part <- topicMetaData.partitionsMetadata)
           println(part.toString)
     }
   }
Index: core/src/main/scala/kafka/network/SocketServer.scala
===================================================================
--- core/src/main/scala/kafka/network/SocketServer.scala	(revision 1362267)
+++ core/src/main/scala/kafka/network/SocketServer.scala	(working copy)
@@ -49,7 +49,7 @@
   def startup() {
     for(i <- 0 until numProcessorThreads) {
       processors(i) = new Processor(i, time, maxRequestSize, requestChannel, stats)
-      Utils.newThread("kafka-processor-" + i, processors(i), false).start()
+      Utils.newThread("kafka-processor-%d-%d".format(port, i), processors(i), false).start()
     }
     // register the processor threads for notification of responses
     requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
@@ -66,6 +66,7 @@
   def shutdown() = {
     info("Shutting down socket server")
     acceptor.shutdown
+    info("Shut down acceptor completed")
     for(processor <- processors)
       processor.shutdown
     info("Shut down socket server.")
@@ -238,18 +239,19 @@
 
   private def processNewResponses() {
     var curr = requestChannel.receiveResponse(id)
-    while(curr != null) {
+    while(isRunning && curr != null) {
       trace("Socket server received response to send, registering for write: " + curr)
       val key = curr.requestKey.asInstanceOf[SelectionKey]
       try {
         key.interestOps(SelectionKey.OP_WRITE)
         key.attach(curr.response)
-        curr = requestChannel.receiveResponse(id)
       } catch {
         case e: CancelledKeyException => {
           debug("Ignoring response for closed socket.")
           close(key)
         }
+      }finally {
+        curr = requestChannel.receiveResponse(id)
       }
     }
   }
Index: core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala	(revision 1362267)
+++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala	(working copy)
@@ -156,16 +156,15 @@
       }
     })
 
-    val stream = connector.createMessageStreamsByFilter(filterSpec).get(0)
-    val iter = if(maxMessages >= 0)
-      stream.slice(0, maxMessages)
-    else
-      stream
-
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
     formatter.init(formatterArgs)
-
     try {
+      val stream = connector.createMessageStreamsByFilter(filterSpec).get(0)
+      val iter = if(maxMessages >= 0)
+        stream.slice(0, maxMessages)
+      else
+        stream
+
       for(messageAndTopic <- iter) {
         try {
           formatter.writeTo(messageAndTopic.message, System.out)
@@ -176,7 +175,7 @@
             else
               throw e
         }
-        if(System.out.checkError()) { 
+        if(System.out.checkError()) {
           // This means no one is listening to our output stream any more, time to shutdown
           System.err.println("Unable to write to standard out, closing consumer.")
           formatter.close()
@@ -186,11 +185,11 @@
       }
     } catch {
       case e => error("Error processing message, stopping consumer: ", e)
+    }finally {
+      System.out.flush()
+      formatter.close()
+      connector.shutdown()
     }
-      
-    System.out.flush()
-    formatter.close()
-    connector.shutdown()
   }
 
   def tryParse(parser: OptionParser, args: Array[String]) = {
Index: core/src/main/scala/kafka/utils/ZkUtils.scala
===================================================================
--- core/src/main/scala/kafka/utils/ZkUtils.scala	(revision 1362267)
+++ core/src/main/scala/kafka/utils/ZkUtils.scala	(working copy)
@@ -18,7 +18,6 @@
 package kafka.utils
 
 import java.util.Properties
-import java.util.concurrent.locks.Condition
 import kafka.cluster.{Broker, Cluster}
 import kafka.common.NoEpochForPartitionException
 import kafka.consumer.TopicCount
@@ -27,6 +26,7 @@
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import scala.collection._
 import util.parsing.json.JSON
+import java.util.concurrent.locks.{ReentrantLock, Condition}
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
@@ -481,18 +481,29 @@
 
 }
 
-class LeaderExists(topic: String, partition: Int, leaderExists: Condition) extends IZkDataListener {
+class LeaderExists(topic: String, partition: Int, leaderLock: ReentrantLock, leaderExists: Condition) extends IZkDataListener {
   @throws(classOf[Exception])
   def handleDataChange(dataPath: String, data: Object) {
     val t = dataPath.split("/").takeRight(3).head
     val p = dataPath.split("/").takeRight(2).head.toInt
-    if(t == topic && p == partition)
-      leaderExists.signal()
+    leaderLock.lock()
+    try {
+      if(t == topic && p == partition)
+        leaderExists.signal()
+    }
+    finally {
+      leaderLock.unlock()
+    }
   }
 
   @throws(classOf[Exception])
   def handleDataDeleted(dataPath: String) {
-    leaderExists.signal()
+    leaderLock.lock()
+    try {
+      leaderExists.signal()
+    }finally {
+      leaderLock.unlock()
+    }
   }
 
 }
Index: core/src/main/scala/kafka/server/KafkaZooKeeper.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(revision 1362267)
+++ core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(working copy)
@@ -25,7 +25,7 @@
 import kafka.admin.AdminUtils
 import java.lang.{Thread, IllegalStateException}
 import collection.mutable.HashSet
-import kafka.common.{InvalidPartitionException, NoLeaderForPartitionException, NotLeaderForPartitionException, KafkaZookeeperClient}
+import kafka.common._
 
 /**
  * Handles the server's interaction with zookeeper. The server needs to register the following paths:
@@ -34,24 +34,23 @@
  *
  */
 class KafkaZooKeeper(config: KafkaConfig,
+                     zkClient: ZkClient,
                      addReplicaCbk: (String, Int, Set[Int]) => Replica,
                      getReplicaCbk: (String, Int) => Option[Replica],
                      becomeLeader: (Replica, Seq[Int]) => Unit,
                      becomeFollower: (Replica, Int, ZkClient) => Unit) extends Logging {
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
-  private var zkClient: ZkClient = null
-  private val leaderChangeListener = new LeaderChangeListener
-  private val topicPartitionsChangeListener = new TopicChangeListener
+  private var leaderChangeListener: LeaderChangeListener = null
+  private var topicPartitionsChangeListener: TopicChangeListener = null
   private var stateChangeHandler: StateChangeCommandHandler = null
 
   private val topicListenerLock = new Object
   private val leaderChangeLock = new Object
 
   def startup() {
-    /* start client */
-    info("connecting to ZK: " + config.zkConnect)
-    zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+    leaderChangeListener = new LeaderChangeListener
+    topicPartitionsChangeListener = new TopicChangeListener
     startStateChangeCommandHandler()
     zkClient.subscribeStateChanges(new SessionExpireListener)
     registerBrokerInZk()
@@ -104,11 +103,7 @@
   }
 
   def close() {
-    if (zkClient != null) {
-      stateChangeHandler.shutdown()
-      info("Closing zookeeper client...")
-      zkClient.close()
-    }
+    stateChangeHandler.shutdown()
   }
 
   def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) {
@@ -121,10 +116,10 @@
     ZkUtils.getLeaderForPartition(zkClient, topic, partition) match {
       case Some(leader) =>
         if(leader != config.brokerId)
-          throw new NotLeaderForPartitionException("Broker %d is not leader for partition %d for topic %s"
+          throw new LeaderNotAvailableException("Broker %d is not leader for partition %d for topic %s"
             .format(config.brokerId, partition, topic))
       case None =>
-        throw new NoLeaderForPartitionException("There is no leader for topic %s partition %d".format(topic, partition))
+        throw new LeaderNotAvailableException("There is no leader for topic %s partition %d".format(topic, partition))
     }
   }
 
@@ -339,7 +334,8 @@
       val newLeaderAndEpochInfo: String = data.asInstanceOf[String]
       val newLeader = newLeaderAndEpochInfo.split(";").head.toInt
       val newEpoch = newLeaderAndEpochInfo.split(";").last.toInt
-      debug("Leader change listener fired for path %s. New leader is %d. New epoch is %d".format(dataPath, newLeader, newEpoch))
+      debug("Leader change listener fired on broker %d for path %s. New leader is %d. New epoch is %d".format(config.brokerId,
+        dataPath, newLeader, newEpoch))
       val topicPartitionInfo = dataPath.split("/")
       val topic = topicPartitionInfo.takeRight(4).head
       val partition = topicPartitionInfo.takeRight(2).head.toInt
Index: core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1362267)
+++ core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -113,7 +113,7 @@
   * leader election on all replicas minus the preferred replica */
   val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
 
-  val keepInSyncTimeMs = Utils.getLong(props, "isr.in.sync.time.ms", 30000)
+  val keepInSyncTimeMs = Utils.getLong(props, "isr.in.sync.time.ms", 10000)
 
   val keepInSyncBytes = Utils.getLong(props, "isr.in.sync.bytes", 4000)
 
Index: core/src/main/scala/kafka/server/KafkaServer.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision 1362267)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(working copy)
@@ -25,6 +25,7 @@
 import atomic.AtomicBoolean
 import kafka.cluster.Replica
 import org.I0Itec.zkclient.ZkClient
+import kafka.common.KafkaZookeeperClient
 
 
 /**
@@ -44,6 +45,7 @@
   private var replicaManager: ReplicaManager = null
   private var apis: KafkaApis = null
   var kafkaController: KafkaController = new KafkaController(config)
+  var zkClient: ZkClient = null
 
   /**
    * Start up API for bringing up a single instance of the Kafka server.
@@ -59,6 +61,9 @@
       needRecovery = false
       cleanShutDownFile.delete
     }
+    /* start client */
+    info("connecting to ZK: " + config.zkConnect)
+    zkClient = KafkaZookeeperClient.getZookeeperClient(config)
     logManager = new LogManager(config,
                                 SystemTime,
                                 1000L * 60 * config.logCleanupIntervalMinutes,
@@ -72,9 +77,9 @@
                                     config.maxSocketRequestSize)
     Utils.registerMBean(socketServer.stats, statsMBeanName)
 
-    kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica, makeLeader, makeFollower)
+    kafkaZookeeper = new KafkaZooKeeper(config, zkClient, addReplica, getReplica, makeLeader, makeFollower)
 
-    replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient)
+    replicaManager = new ReplicaManager(config, time, zkClient)
 
     apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
     requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
@@ -118,6 +123,8 @@
         kafkaController.shutDown()
 
       kafkaZookeeper.close
+      info("Closing zookeeper client...")
+      zkClient.close()
 
       val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
       debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
Index: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(revision 1362267)
+++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(working copy)
@@ -22,10 +22,10 @@
 import kafka.message.ByteBufferMessageSet
 
 class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager)
-        extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs,
-          socketBufferSize = brokerConfig.replicaSocketBufferSize, fetchSize = brokerConfig.replicaFetchSize,
-          fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs,
-          minBytes = brokerConfig.replicaMinBytes) {
+  extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs,
+    socketBufferSize = brokerConfig.replicaSocketBufferSize, fetchSize = brokerConfig.replicaFetchSize,
+    fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs,
+    minBytes = brokerConfig.replicaMinBytes) {
 
   // process fetched data
   def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) {
@@ -35,10 +35,15 @@
 
     if (fetchOffset != replica.logEndOffset())
       throw new RuntimeException("offset mismatch: fetchOffset=%d, logEndOffset=%d".format(fetchOffset, replica.logEndOffset()))
+    trace("Follower %d has replica log end offset %d. Received %d messages and leader hw %d".format(replica.brokerId,
+      replica.logEndOffset(), messageSet.sizeInBytes, partitionData.hw))
     replica.log.get.append(messageSet)
-    replica.highWatermark(Some(partitionData.hw))
-    trace("follower %d set replica highwatermark for topic %s partition %d to %d"
-          .format(replica.brokerId, topic, partitionId, partitionData.hw))
+    trace("Follower %d has replica log end offset %d after appending %d messages"
+      .format(replica.brokerId, replica.logEndOffset(), messageSet.sizeInBytes))
+    val followerHighWatermark = replica.logEndOffset().min(partitionData.hw)
+    replica.highWatermark(Some(followerHighWatermark))
+    trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
+      .format(replica.brokerId, topic, partitionId, followerHighWatermark))
   }
 
   // handle a partition whose offset is out of range and return a new fetch offset
Index: core/src/main/scala/kafka/server/ReplicaManager.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaManager.scala	(revision 1362267)
+++ core/src/main/scala/kafka/server/ReplicaManager.scala	(working copy)
@@ -136,15 +136,17 @@
     if(newHw > oldHw) {
       debug("Updating leader HW for topic %s partition %d to %d".format(replica.topic, replica.partition.partitionId, newHw))
       partition.leaderHW(Some(newHw))
-    }
+    }else
+      debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s".format(replica.topic,
+        replica.partition.partitionId, oldHw, newHw, allLeos.mkString(",")))
   }
 
   def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
-    // stop replica fetcher thread, if any
-    replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
     // read and cache the ISR
     replica.partition.leaderId(Some(replica.brokerId))
     replica.partition.updateISR(currentISRInZk.toSet)
+    // stop replica fetcher thread, if any
+    replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
     // also add this partition to the list of partitions for which the leader is the current broker
     try {
       leaderReplicaLock.lock()
@@ -157,6 +159,8 @@
   def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
     info("broker %d intending to follow leader %d for topic %s partition %d"
       .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+    // set the leader for this partition correctly on this broker
+    replica.partition.leaderId(Some(leaderBrokerId))
     // remove this replica's partition from the ISR expiration queue
     try {
       leaderReplicaLock.lock()
@@ -187,7 +191,6 @@
     try {
       info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
         .format(config.keepInSyncTimeMs))
-
       leaderReplicaLock.lock()
       leaderReplicas.foreach { partition =>
          // shrink ISR if a follower is slow or stuck
@@ -230,6 +233,7 @@
           // update ISR in ZK and cache
           replica.partition.updateISR(newISR.map(_.brokerId), Some(zkClient))
         }
+        debug("Recording follower %d position %d for topic %s partition %d".format(replicaId, offset, topic, partition))
         maybeIncrementLeaderHW(replica)
       case None =>
         throw new IllegalStateException("No replica %d in replica manager on %d".format(replicaId, config.brokerId))
@@ -247,7 +251,9 @@
   }
 
   def close() {
+    info("Closing replica manager on broker " + config.brokerId)
     isrExpirationScheduler.shutdown()
     replicaFetcherManager.shutdown()
+    info("Replica manager shutdown on broker " + config.brokerId)
   }
 }
Index: core/src/main/scala/kafka/server/KafkaController.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaController.scala	(revision 1362267)
+++ core/src/main/scala/kafka/server/KafkaController.scala	(working copy)
@@ -134,11 +134,15 @@
 
   def removeBroker(brokerId: Int){
     brokers.remove(brokerId)
-    messageChannels(brokerId).disconnect()
-    messageChannels.remove(brokerId)
-    messageQueues.remove(brokerId)
-    messageThreads(brokerId).shutDown()
-    messageThreads.remove(brokerId)
+    try {
+      messageChannels(brokerId).disconnect()
+      messageChannels.remove(brokerId)
+      messageQueues.remove(brokerId)
+      messageThreads(brokerId).shutDown()
+      messageThreads.remove(brokerId)
+    }catch {
+      case e => error("Error while shutting down controller", e)
+    }
   }
 }
 
@@ -189,10 +193,10 @@
   }
 
   def shutDown() = {
-    if(controllerChannelManager != null){
+    if(controllerChannelManager != null)
       controllerChannelManager.shutDown()
-    }
-    zkClient.close()
+    if(zkClient != null)
+      zkClient.close()
   }
 
   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1362267)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -48,8 +48,8 @@
   /**
    * Top-level method that handles all requests and multiplexes to the right api
    */
-  def handle(request: RequestChannel.Request) { 
-    val apiId = request.request.buffer.getShort() 
+  def handle(request: RequestChannel.Request) {
+    val apiId = request.request.buffer.getShort()
     apiId match {
       case RequestKeys.Produce => handleProducerRequest(request)
       case RequestKeys.Fetch => handleFetchRequest(request)
@@ -113,6 +113,7 @@
     val sTime = SystemTime.milliseconds
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Producer request " + request.toString)
+    trace("Broker %s received produce request %s".format(logManager.config.brokerId, produceRequest.toString))
 
     val response = produceToLocalLog(produceRequest)
     debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
@@ -203,7 +204,7 @@
     val fetchRequest = FetchRequest.readFrom(request.request.buffer)
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Fetch request " + fetchRequest.toString)
-
+    trace("Broker %s received fetch request %s".format(logManager.config.brokerId, fetchRequest.toString))
     // validate the request
     try {
       fetchRequest.validate()
@@ -245,7 +246,7 @@
       fetchRequestPurgatory.watch(delayedFetch)
     }
   }
-    
+
   /**
    * Calculate the number of available bytes for the given fetch request
    */
@@ -257,7 +258,7 @@
           debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i)))
           val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i))
           val available = maybeLog match {
-            case Some(log) => max(0, log.highwaterMark - offsetDetail.offsets(i))
+            case Some(log) => max(0, log.getHW() - offsetDetail.offsets(i))
             case None => 0
           }
           totalBytes += math.min(offsetDetail.fetchSizes(i), available)
@@ -321,6 +322,8 @@
                 val replica = replicaOpt.get
                 debug("Leader %d for topic %s partition %d received fetch request from follower %d"
                   .format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+                debug("Leader %d returning %d messages for topic %s partition %d to follower %d"
+                  .format(logManager.config.brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
                 new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
             }
         }
@@ -374,36 +377,46 @@
     val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
     val config = logManager.getServerConfig
     val zkClient = kafkaZookeeper.getZookeeperClient
-    val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
+    var errorCode = ErrorMapping.NoError
 
-    metadataRequest.topics.zip(topicMetadataList).foreach { topicAndMetadata =>
-      val topic = topicAndMetadata._1
-      topicAndMetadata._2 match {
-        case Some(metadata) => topicsMetadata += metadata
-        case None =>
-          /* check if auto creation of topics is turned on */
-          if(config.autoCreateTopics) {
-            CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
-            info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
-              .format(topic, config.numPartitions, config.defaultReplicationFactor))
-            val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
-            newTopicMetadata match {
-              case Some(topicMetadata) => topicsMetadata += topicMetadata
-              case None =>
-                throw new IllegalStateException("Topic metadata for automatically created topic %s does not exist".format(topic))
+    try {
+      val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
+
+      metadataRequest.topics.zip(topicMetadataList).foreach { topicAndMetadata =>
+        val topic = topicAndMetadata._1
+        topicAndMetadata._2.errorCode match {
+          case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2
+          case ErrorMapping.UnknownTopicCode =>
+            /* check if auto creation of topics is turned on */
+            if(config.autoCreateTopics) {
+              CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+              info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+                .format(topic, config.numPartitions, config.defaultReplicationFactor))
+              val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
+              newTopicMetadata.errorCode match {
+                case ErrorMapping.NoError => topicsMetadata += newTopicMetadata
+                case _ =>
+                  throw new IllegalStateException("Topic metadata for automatically created topic %s does not exist".format(topic))
+              }
             }
-          }
+          case _ => error("Error while fetching topic metadata for topic " + topic,
+            ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause)
+        }
       }
+    }catch {
+      case e => error("Error while retrieving topic metadata", e)
+        // convert exception type to error code
+        errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
     }
-    info("Sending response for topic metadata request")
-    val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq)
+    topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
+    val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq, errorCode)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
   def close() {
     fetchRequestPurgatory.shutdown()
   }
-  
+
   /**
    * A delayed fetch request
    */
@@ -415,7 +428,7 @@
    * A holding pen for fetch requests waiting to be satisfied
    */
   class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, PartitionData] {
-    
+
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
      */
@@ -424,7 +437,7 @@
       val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageDataSize)
       accumulatedSize >= delayedFetch.fetch.minBytes
     }
-    
+
     /**
      * When a request expires just answer it with whatever data is present
      */
@@ -514,7 +527,7 @@
                 numAcks, produce.requiredAcks,
                 topic, partitionId))
               if ((produce.requiredAcks < 0 && numAcks >= isr.size) ||
-                  (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
+                (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
                 /*
                  * requiredAcks < 0 means acknowledge after all replicas in ISR
                  * are fully caught up to the (local) leader's offset
@@ -555,7 +568,7 @@
 
       override def toString =
         "acksPending:%b, error: %d, requiredOffset: %d".format(
-        acksPending, error, requiredOffset
+          acksPending, error, requiredOffset
         )
     }
   }
Index: core/src/main/scala/kafka/api/TopicMetaDataResponse.scala
===================================================================
--- core/src/main/scala/kafka/api/TopicMetaDataResponse.scala	(revision 1362267)
+++ core/src/main/scala/kafka/api/TopicMetaDataResponse.scala	(working copy)
@@ -24,8 +24,8 @@
 object TopicMetaDataResponse {
 
   def readFrom(buffer: ByteBuffer): TopicMetaDataResponse = {
+    val versionId = buffer.getShort
     val errorCode = buffer.getShort
-    val versionId = buffer.getShort
 
     val topicCount = buffer.getInt
     val topicsMetadata = new Array[TopicMetadata](topicCount)
Index: core/src/main/scala/kafka/api/TopicMetadata.scala
===================================================================
--- core/src/main/scala/kafka/api/TopicMetadata.scala	(revision 1362267)
+++ core/src/main/scala/kafka/api/TopicMetadata.scala	(working copy)
@@ -21,6 +21,7 @@
 import java.nio.ByteBuffer
 import kafka.utils.Utils._
 import collection.mutable.ListBuffer
+import kafka.common.ErrorMapping
 
 /**
  * topic (2 bytes + topic.length)
@@ -57,24 +58,28 @@
 object TopicMetadata {
 
   def readFrom(buffer: ByteBuffer): TopicMetadata = {
+    val errorCode = getShortInRange(buffer, "error code", (-1, ErrorMapping.getMaxErrorCodeValue))
     val topic = readShortString(buffer)
     val numPartitions = getIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
     val partitionsMetadata = new ListBuffer[PartitionMetadata]()
     for(i <- 0 until numPartitions)
       partitionsMetadata += PartitionMetadata.readFrom(buffer)
-    new TopicMetadata(topic, partitionsMetadata)
+    new TopicMetadata(topic, partitionsMetadata, errorCode)
   }
 }
 
-case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata]) {
+case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata], errorCode: Short = ErrorMapping.NoError) {
   def sizeInBytes: Int = {
-    var size: Int = shortStringLength(topic)
+    var size: Int = 2   /* error code */
+    size += shortStringLength(topic)
     size += partitionsMetadata.foldLeft(4 /* number of partitions */)(_ + _.sizeInBytes)
     debug("Size of topic metadata = " + size)
     size
   }
 
   def writeTo(buffer: ByteBuffer) {
+    /* error code */
+    buffer.putShort(errorCode)
     /* topic */
     writeShortString(buffer, topic)
     /* number of partitions */
@@ -86,6 +91,7 @@
 object PartitionMetadata {
 
   def readFrom(buffer: ByteBuffer): PartitionMetadata = {
+    val errorCode = getShortInRange(buffer, "error code", (-1, ErrorMapping.getMaxErrorCodeValue))
     val partitionId = getIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */
     val doesLeaderExist = getLeaderRequest(buffer.get)
     val leader = doesLeaderExist match {
@@ -129,7 +135,7 @@
         Some(new LogMetadata(numLogSegments, totalDataSize, segmentMetadata))
       case LogSegmentMetadataDoesNotExist => None
     }
-    new PartitionMetadata(partitionId, leader, replicas, isr, logMetadata)
+    new PartitionMetadata(partitionId, leader, replicas, isr, errorCode, logMetadata)
   }
 
   def getLeaderRequest(requestId: Byte): LeaderRequest = {
@@ -149,9 +155,9 @@
 }
 
 case class PartitionMetadata(partitionId: Int, leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty,
-                             logMetadata: Option[LogMetadata] = None) {
+                             errorCode: Short = ErrorMapping.NoError, logMetadata: Option[LogMetadata] = None) {
   def sizeInBytes: Int = {
-    var size: Int = 4 /* partition id */ + 1 /* if leader exists*/
+    var size: Int = 2 /* error code */ + 4 /* partition id */ + 1 /* if leader exists*/
 
     leader match {
       case Some(l) => size += l.sizeInBytes
@@ -173,6 +179,7 @@
   }
 
   def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(errorCode)
     buffer.putInt(partitionId)
 
     /* if leader exists*/
Index: core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
===================================================================
--- core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java	(revision 1362267)
+++ core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java	(working copy)
@@ -18,13 +18,14 @@
 package kafka.javaapi.consumer;
 
 
-import java.util.List;
-import java.util.Map;
 import kafka.consumer.KafkaStream;
 import kafka.consumer.TopicFilter;
 import kafka.message.Message;
 import kafka.serializer.Decoder;
 
+import java.util.List;
+import java.util.Map;
+
 public interface ConsumerConnector {
   /**
    *  Create a list of MessageStreams of type T for each topic.
Index: perf/src/main/scala/kafka/perf/ProducerPerformance.scala
===================================================================
--- perf/src/main/scala/kafka/perf/ProducerPerformance.scala	(revision 1362267)
+++ perf/src/main/scala/kafka/perf/ProducerPerformance.scala	(working copy)
@@ -131,7 +131,6 @@
     
     // override necessary flags in seqIdMode
     if (seqIdMode) { 
-      isAsync = true
       batchSize = 1
       isFixSize = true
 
@@ -175,6 +174,10 @@
       props.put("batch.size", config.batchSize.toString)
       props.put("queue.enqueueTimeout.ms", "-1")
     }
+    props.put("producer.request.required.acks", "-1")
+    props.put("producer.request.ack.timeout.ms", "3000")
+    props.put("socket.timeout.ms", "3000")
+
     val producerConfig = new ProducerConfig(props)
     val producer = new Producer[Message, Message](producerConfig)
     val seqIdNumDigit = 10   // no. of digits for max int value
@@ -198,12 +201,6 @@
       val topicLabel     = "Topic"
       var leftPaddedSeqId : String = ""
       
-      var messageSet: List[Message] = Nil
-      if(config.isFixSize) {
-        for(k <- 0 until config.batchSize) {
-          messageSet ::= message
-        }
-      }
       var j: Long = 0L
       while(j < messagesPerThread) {
         var strLength = config.messageSize
@@ -231,11 +228,17 @@
           debug(seqMsgString)
           message = new Message(seqMsgString.getBytes())
         }
-                
+
+        var messageSet: List[Message] = Nil
+        if(config.isFixSize) {
+          for(k <- 0 until config.batchSize) {
+            messageSet ::= message
+          }
+        }
+
         if (!config.isFixSize) {
           for(k <- 0 until config.batchSize) {
             strLength = rand.nextInt(config.messageSize)
-            val message = new Message(getByteArrayOfLength(strLength))
             messageSet ::= message
             bytesSent += message.payloadSize
           }
@@ -262,7 +265,7 @@
             nSends += 1
           }
         }catch {
-          case e: Exception => e.printStackTrace
+          case e: Exception => error("Error sending messages", e)
         }
         if(nSends % config.reportingInterval == 0) {
           reportTime = System.currentTimeMillis()
