Index: core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogOffsetTest.scala	(revision 1307120)
+++ core/src/test/scala/unit/kafka/log/LogOffsetTest.scala	(working copy)
@@ -26,7 +26,6 @@
 import kafka.consumer.SimpleConsumer
 import org.junit.{After, Before, Test}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
-import org.apache.log4j._
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import kafka.admin.CreateTopicCommand
@@ -44,8 +43,6 @@
   val brokerPort: Int = 9099
   var simpleConsumer: SimpleConsumer = null
 
-  private val logger = Logger.getLogger(classOf[LogOffsetTest])
-  
   @Before
   override def setUp() {
     super.setUp()
@@ -99,7 +96,7 @@
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zookeeper.client, topic, 1, 1, "1")
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
 
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(topic, part)
@@ -137,7 +134,7 @@
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zookeeper.client, topic, 1, 1, "1")
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
 
     var offsetChanged = false
     for(i <- 1 to 14) {
@@ -158,7 +155,7 @@
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zookeeper.client, topic, 3, 1, "1,1,1")
+    CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
 
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(topic, part)
@@ -185,7 +182,7 @@
     val part = Integer.valueOf(topicPartition.split("-").last).intValue
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zookeeper.client, topic, 3, 1, "1,1,1")
+    CreateTopicCommand.createTopic(zkClient, topic, 3, 1, "1,1,1")
 
     val logManager = server.getLogManager
     val log = logManager.getOrCreateLog(topic, part)
Index: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(revision 1307120)
+++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(working copy)
@@ -22,10 +22,10 @@
 import org.junit.Test
 import kafka.common.OffsetOutOfRangeException
 import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{TestZKUtils, Utils, MockTime, TestUtils}
 import org.scalatest.junit.JUnit3Suite
 import kafka.admin.CreateTopicCommand
 import kafka.server.KafkaConfig
+import kafka.utils._
 
 class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -48,10 +48,10 @@
     logManager.startup
     logDir = logManager.logDir
 
-    TestUtils.createBrokersInZk(zookeeper.client, List(config.brokerId))
+    TestUtils.createBrokersInZk(zkClient, List(config.brokerId))
 
     // setup brokers in zookeeper as owners of partitions for this test
-    CreateTopicCommand.createTopic(zookeeper.client, name, 3, 1, "0,0,0")
+    CreateTopicCommand.createTopic(zkClient, name, 3, 1, "0,0,0")
   }
 
   override def tearDown() {
Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(revision 1307120)
+++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(working copy)
@@ -22,26 +22,22 @@
 import kafka.admin.CreateTopicCommand
 import java.nio.ByteBuffer
 import kafka.log.LogManager
-import kafka.utils.TestUtils
 import junit.framework.Assert._
-import org.I0Itec.zkclient.ZkClient
-import TestUtils._
 import org.easymock.EasyMock
 import kafka.network.BoundedByteBufferReceive
 import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
 import kafka.cluster.Broker
 import kafka.server.{KafkaZooKeeper, KafkaApis, KafkaConfig}
+import kafka.utils.TestUtils
+import kafka.utils.TestUtils._
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
   val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
-  var zkClient: ZkClient = null
   var brokers: Seq[Broker] = null
 
   override def setUp() {
     super.setUp()
-    zkClient = zookeeper.client
-    // create brokers in zookeeper
     brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
   }
 
Index: core/src/test/scala/unit/kafka/integration/FetcherTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(revision 1307120)
+++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(working copy)
@@ -28,8 +28,8 @@
 import org.scalatest.junit.JUnit3Suite
 import kafka.integration.KafkaServerTestHarness
 import kafka.producer.{ProducerData, Producer}
+import kafka.utils.TestUtils._
 import kafka.utils.TestUtils
-import kafka.utils.TestUtils._
 
 class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
 
@@ -67,7 +67,7 @@
   def testFetcher() {
     val perNode = 2
     var count = sendMessages(perNode)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
     fetch(count)
     Thread.sleep(100)
     assertQueueEmpty()
Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(revision 1307120)
+++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(working copy)
@@ -24,10 +24,12 @@
 import kafka.integration.KafkaServerTestHarness
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
 import kafka.server.KafkaConfig
-import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
 import java.net.SocketTimeoutException
+import org.I0Itec.zkclient.ZkClient
+import org.I0Itec.zkclient.serialize.ZkSerializer
+import kafka.utils.{ZKStringSerializer, TestZKUtils, SystemTime, TestUtils}
 
 class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   private var messageBytes =  new Array[Byte](2);
@@ -116,10 +118,12 @@
     response.offsets.foreach(Assert.assertEquals(-1L, _))
 
     // #2 - test that we get correct offsets when partition is owner by broker
-    val zkClient = zookeeper.client
+    val zkClient = new ZkClient(zookeeperConnect)
+    zkClient.setZkSerializer(ZKStringSerializer)
     CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
     CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
 
+    Thread.sleep(500)
     val response2 = producer.send(request)
     Assert.assertEquals(request.correlationId, response2.correlationId)
     Assert.assertEquals(response2.errors.length, response2.offsets.length)
@@ -134,6 +138,7 @@
     // the middle message should have been rejected because broker doesn't lead partition
     Assert.assertEquals(ErrorMapping.WrongPartitionCode.toShort, response2.errors(1))
     Assert.assertEquals(-1, response2.offsets(1))
+    zkClient.close()
   }
 
   @Test
Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala	(revision 1307120)
+++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala	(working copy)
@@ -30,23 +30,19 @@
 import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils._
-import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import collection.Map
 import collection.mutable.ListBuffer
-import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
+import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils}
 
 class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
   val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
-  var zkClient: ZkClient = null
   var brokers: Seq[Broker] = null
 
   override def setUp() {
     super.setUp()
-    zkClient = zookeeper.client
-    // create brokers in zookeeper
     brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
   }
 
Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(revision 1307120)
+++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(working copy)
@@ -27,10 +27,9 @@
 import kafka.server.{KafkaConfig, KafkaRequestHandler, KafkaServer}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.log4j.{Level, Logger}
-import org.I0Itec.zkclient.ZkClient
 import org.junit.Assert._
 import org.junit.Test
-import kafka.utils.{SystemTime, TestZKUtils, Utils, TestUtils}
+import kafka.utils._
 
 class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
   private val brokerId1 = 0
@@ -42,13 +41,10 @@
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
   private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
-  private var zkClient: ZkClient = null
 
   override def setUp() {
     super.setUp()
     // set up 2 brokers with 4 partitions each
-    zkClient = zookeeper.client
-
     val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
     val config1 = new KafkaConfig(props1) {
       override val numPartitions = 4
@@ -166,7 +162,7 @@
 
     // restart server 1
     server1.startup()
-    Thread.sleep(500)
+    Thread.sleep(100)
 
     try {
       // cross check if broker 1 got the messages
Index: core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
===================================================================
--- core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala	(revision 1307120)
+++ core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala	(working copy)
@@ -20,9 +20,8 @@
 import org.apache.zookeeper.server.ZooKeeperServer
 import org.apache.zookeeper.server.NIOServerCnxn
 import kafka.utils.TestUtils
-import org.I0Itec.zkclient.ZkClient
 import java.net.InetSocketAddress
-import kafka.utils.{Utils, ZKStringSerializer}
+import kafka.utils.Utils
 
 class EmbeddedZookeeper(val connectString: String) {
   val snapshotDir = TestUtils.tempDir()
@@ -32,8 +31,6 @@
   val port = connectString.split(":")(1).toInt
   val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
   factory.startup(zookeeper)
-  val client = new ZkClient(connectString)
-  client.setZkSerializer(ZKStringSerializer)
 
   def shutdown() {
     factory.shutdown()
Index: core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
===================================================================
--- core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala	(revision 1307120)
+++ core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala	(working copy)
@@ -18,19 +18,24 @@
 package kafka.zk
 
 import org.scalatest.junit.JUnit3Suite
-import kafka.utils.TestZKUtils
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ZKStringSerializer, TestZKUtils}
 
 trait ZooKeeperTestHarness extends JUnit3Suite {
   val zkConnect: String = TestZKUtils.zookeeperConnect
   var zookeeper: EmbeddedZookeeper = null
+  var zkClient: ZkClient = null
 
   override def setUp() {
     zookeeper = new EmbeddedZookeeper(zkConnect)
+    zkClient = new ZkClient(zookeeper.connectString)
+    zkClient.setZkSerializer(ZKStringSerializer)
     super.setUp
   }
 
   override def tearDown() {
     super.tearDown
+    zkClient.close()
     zookeeper.shutdown()
   }
 
Index: core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(revision 1307120)
+++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(working copy)
@@ -20,10 +20,9 @@
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.admin.CreateTopicCommand
-import org.I0Itec.zkclient.ZkClient
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
-import kafka.utils.{ZKStringSerializer, Utils, TestUtils}
+import kafka.utils.{Utils, TestUtils}
 
 class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -37,13 +36,10 @@
   val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
 
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
-  var zkClient: ZkClient = null
 
   override def setUp() {
     super.setUp()
 
-    zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer)
-
     // start both servers
     val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
     val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
Index: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala	(revision 1307120)
+++ core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala	(working copy)
@@ -48,7 +48,7 @@
       server.startup()
 
       // create topic
-      CreateTopicCommand.createTopic(zookeeper.client, topic, 1, 1, "0")
+      CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
 
       val producer = new Producer[Int, Message](getProducerConfig(64*1024, 100000, 10000))
 
@@ -60,6 +60,7 @@
       server.shutdown()
       val cleanShutDownFile = new File(new File(config.logDir), server.CleanShutdownFile)
       assertTrue(cleanShutDownFile.exists)
+      producer.close()
     }
 
 
@@ -73,7 +74,7 @@
       val server = new KafkaServer(config)
       server.startup()
 
-      waitUntilLeaderIsElected(zookeeper.client, topic, 0, 1000)
+      waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
 
       var fetchedMessage: ByteBufferMessageSet = null
       while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
@@ -97,6 +98,7 @@
 
       server.shutdown()
       Utils.rm(server.config.logDir)
+      producer.close()
     }
 
   }
Index: core/src/test/scala/unit/kafka/admin/AdminTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/admin/AdminTest.scala	(revision 1307120)
+++ core/src/test/scala/unit/kafka/admin/AdminTest.scala	(working copy)
@@ -134,18 +134,18 @@
       List("1", "2", "3"),
       List("1", "3", "4")      
       )
-    TestUtils.createBrokersInZk(zookeeper.client, List(0, 1, 2, 3, 4))
+    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
 
     val topic = "test"
     // create the topic
-    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
-    val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zookeeper.client).head
+    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
                                   .get.partitionsMetadata.map(p => p.replicas)
     val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
     expectedReplicaAssignment.toList.zip(actualReplicaList).foreach(l => assertEquals(l._1, l._2))
 
     try {
-      AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
+      AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
       fail("shouldn't be able to create a topic already exists")
     }
     catch {
@@ -161,10 +161,10 @@
       List("1", "2", "3")
     )
     val topic = "auto-topic"
-    TestUtils.createBrokersInZk(zookeeper.client, List(0, 1, 2, 3))
-    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
+    TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
+    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
 
-    val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zookeeper.client).head
+    val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
     newTopicMetadata match {
       case Some(metadata) => assertEquals(topic, metadata.topic)
         assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)
Index: core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala	(revision 1307120)
+++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala	(working copy)
@@ -52,14 +52,16 @@
   val consumer2 = "consumer2"
   val consumer3 = "consumer3"
   val nMessages = 2
-  var zkClient: ZkClient = null
 
   override def setUp() {
     super.setUp()
     dirs = new ZKGroupTopicDirs(group, topic)
-    zkClient = new ZkClient(zkConnect, 6000, 3000, ZKStringSerializer)
   }
 
+  override def tearDown() {
+    super.tearDown()
+  }
+
   def testBasic() {
     val requestHandlerLogger = Logger.getLogger(classOf[KafkaApis])
     requestHandlerLogger.setLevel(Level.FATAL)
@@ -98,8 +100,8 @@
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
 
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1.size, receivedMessages1.size)
@@ -124,8 +126,8 @@
     val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
     val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -149,8 +151,8 @@
     val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
     val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
@@ -178,8 +180,8 @@
     val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
     val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
@@ -209,8 +211,8 @@
     val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
     val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -234,8 +236,8 @@
     val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
     val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
@@ -438,11 +440,11 @@
 
   def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
     import scala.collection.JavaConversions
-    val children = zookeeper.client.getChildren(path)
+    val children = zkClient.getChildren(path)
     Collections.sort(children)
     val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children)
     childrenAsSeq.map(partition =>
-      (partition, zookeeper.client.readData(path + "/" + partition).asInstanceOf[String]))
+      (partition, zkClient.readData(path + "/" + partition).asInstanceOf[String]))
   }
 
 }
Index: core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala	(revision 1307120)
+++ core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala	(working copy)
@@ -26,8 +26,8 @@
 import org.apache.log4j.{Level, Logger}
 import kafka.message._
 import kafka.javaapi.producer.{ProducerData, Producer}
+import kafka.utils.TestUtils._
 import kafka.utils.{Utils, Logging, TestUtils}
-import kafka.utils.TestUtils._
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
@@ -53,8 +53,8 @@
     // send some messages to each broker
     val sentMessages1 = sendMessages(nMessages, "batch1")
 
-    waitUntilLeaderIsElected(zookeeper.client, topic, 0, 500)
-    waitUntilLeaderIsElected(zookeeper.client, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
Index: core/src/test/resources/log4j.properties
===================================================================
--- core/src/test/resources/log4j.properties	(revision 1307120)
+++ core/src/test/resources/log4j.properties	(working copy)
@@ -4,9 +4,9 @@
 # The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
-# 
+#
 #    http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,5 +21,5 @@
 log4j.logger.kafka=ERROR
 
 # zkclient can be verbose, during debugging it is common to adjust is separately
-log4j.logger.org.I0Itec.zkclient.ZkClient=ERROR
-log4j.logger.org.apache.zookeeper=ERROR
+log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
+log4j.logger.org.apache.zookeeper=WARN
Index: core/src/main/scala/kafka/producer/ProducerPool.scala
===================================================================
--- core/src/main/scala/kafka/producer/ProducerPool.scala	(revision 1307120)
+++ core/src/main/scala/kafka/producer/ProducerPool.scala	(working copy)
@@ -88,6 +88,7 @@
       val iter = syncProducers.values.iterator
       while(iter.hasNext)
         iter.next.close
+      zkClient.close()
     }
   }
 }
Index: core/src/main/scala/kafka/utils/Utils.scala
===================================================================
--- core/src/main/scala/kafka/utils/Utils.scala	(revision 1307120)
+++ core/src/main/scala/kafka/utils/Utils.scala	(working copy)
@@ -29,7 +29,6 @@
 import kafka.message.{NoCompressionCodec, CompressionCodec}
 import org.I0Itec.zkclient.ZkClient
 import java.util.{Random, Properties}
-import kafka.network.{BoundedByteBufferReceive, Receive, BoundedByteBufferSend, Request}
 
 /**
  * Helper functions!
Index: core/src/main/scala/kafka/server/KafkaServer.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision 1307120)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(working copy)
@@ -17,13 +17,13 @@
 
 package kafka.server
 
-import java.util.concurrent._
-import java.util.concurrent.atomic._
 import java.io.File
 import kafka.network.{SocketServerStats, SocketServer}
 import kafka.log.LogManager
 import kafka.utils._
 import kafka.cluster.Replica
+import java.util.concurrent._
+import atomic.AtomicBoolean
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
@@ -32,8 +32,10 @@
 class KafkaServer(val config: KafkaConfig) extends Logging {
 
   val CleanShutdownFile = ".kafka_cleanshutdown"
-  private val isShuttingDown = new AtomicBoolean(false)  
-  private val shutdownLatch = new CountDownLatch(1)
+  private var isShuttingDown = new AtomicBoolean(false)
+  private var shutdownLatch = new CountDownLatch(1)
+  private var isStartingUp = new AtomicBoolean(false)
+  private var startupComplete = new CountDownLatch(1)
   private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
@@ -46,43 +48,56 @@
    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
    */
   def startup() {
-    info("Starting Kafka server...")
-    var needRecovery = true
-    val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
-    if (cleanShutDownFile.exists) {
-      needRecovery = false
-      cleanShutDownFile.delete
+    // if a server is in the process of a shutdown, wait for the shutdown to complete, before starting it
+    if(isShuttingDown.get()) {
+      info("Server is in the process of shutting down. Waiting for shutdown to complete, before attempting to start the server")
+      shutdownLatch.await()
     }
-    logManager = new LogManager(config,
-                                SystemTime,
-                                1000L * 60 * config.logCleanupIntervalMinutes,
-                                1000L * 60 * 60 * config.logRetentionHours,
-                                needRecovery)
-                                                
-    socketServer = new SocketServer(config.port,
-                                    config.numNetworkThreads,
-                                    config.monitoringPeriodSecs,
-                                    config.numQueuedRequests,
-                                    config.maxSocketRequestSize)
-    Utils.registerMBean(socketServer.stats, statsMBeanName)
+    // if a server is not shutting down, not currently starting up AND hasn't already started, startup can proceed
+    if(!isStartingUp.get() && !isShuttingDown.get() && (startupComplete.getCount == 1)) {
+      isStartingUp.compareAndSet(false, true)
+      info("Starting Kafka server...")
+      var needRecovery = true
+      val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
+      if (cleanShutDownFile.exists) {
+        needRecovery = false
+        cleanShutDownFile.delete
+      }
+      logManager = new LogManager(config,
+        SystemTime,
+        1000L * 60 * config.logCleanupIntervalMinutes,
+        1000L * 60 * 60 * config.logRetentionHours,
+        needRecovery)
 
-    kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica)
+      socketServer = new SocketServer(config.port,
+        config.numNetworkThreads,
+        config.monitoringPeriodSecs,
+        config.numQueuedRequests,
+        config.maxSocketRequestSize)
+      Utils.registerMBean(socketServer.stats, statsMBeanName)
 
-    requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel,
-      new KafkaApis(logManager, kafkaZookeeper).handle, config.numIoThreads)
-    socketServer.startup
+      kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica)
 
-    Mx4jLoader.maybeLoad
+      requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel,
+        new KafkaApis(logManager, kafkaZookeeper).handle, config.numIoThreads)
+      socketServer.startup
 
-    /**
-     *  Registers this broker in ZK. After this, consumers can connect to broker.
-     *  So this should happen after socket server start.
-     */
-    logManager.startup
+      Mx4jLoader.maybeLoad
 
-    // starting relevant replicas and leader election for partitions assigned to this broker
-    kafkaZookeeper.startup
-    info("Server started.")
+      /**
+       *  Registers this broker in ZK. After this, consumers can connect to broker.
+       *  So this should happen after socket server start.
+       */
+      logManager.startup
+
+      // starting relevant replicas and leader election for partitions assigned to this broker
+      kafkaZookeeper.startup
+      isStartingUp.compareAndSet(true, false)
+      // since startup is complete, reset the shutdown latch to allow shutdown to happen
+      shutdownLatch = new CountDownLatch(1)
+      startupComplete.countDown()
+      info("Server started.")
+    }
   }
   
   /**
@@ -90,8 +105,15 @@
    * Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread
    */
   def shutdown() {
-    val canShutdown = isShuttingDown.compareAndSet(false, true);
+    if(isStartingUp.get()) {
+      // wait for the start up to complete
+      info("Server is in the process of starting up. Waiting for startup to complete, before attempting to shutdown the server")
+      startupComplete.await()
+    }
+    // if a server is not starting up, not currently shutting down AND hasn't already shutdown, shut down can proceed
+    val canShutdown = !isShuttingDown.get() && !isStartingUp.get() && (shutdownLatch.getCount == 1)
     if (canShutdown) {
+      isShuttingDown.compareAndSet(false, true)
       info("Shutting down Kafka server with id " + config.brokerId)
       kafkaZookeeper.close
       if (socketServer != null)
@@ -105,6 +127,9 @@
       val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
       debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
       cleanShutDownFile.createNewFile
+      isShuttingDown.compareAndSet(true, false)
+      // since shutdown is complete, allow startup to happen
+      startupComplete = new CountDownLatch(1)
       shutdownLatch.countDown()
       info("Kafka server shut down completed")
     }
