Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(revision 1305462)
+++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(working copy)
@@ -166,7 +166,7 @@
 
     // restart server 1
     server1.startup()
-    Thread.sleep(500)
+    Thread.sleep(100)
 
     try {
       // cross check if broker 1 got the messages
Index: core/src/test/resources/log4j.properties
===================================================================
--- core/src/test/resources/log4j.properties	(revision 1305462)
+++ core/src/test/resources/log4j.properties	(working copy)
@@ -21,5 +21,5 @@
 log4j.logger.kafka=ERROR
 
 # zkclient can be verbose, during debugging it is common to adjust is separately
-log4j.logger.org.I0Itec.zkclient.ZkClient=ERROR
-log4j.logger.org.apache.zookeeper=ERROR
+log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
+log4j.logger.org.apache.zookeeper=WARN
Index: core/src/main/scala/kafka/producer/ProducerPool.scala
===================================================================
--- core/src/main/scala/kafka/producer/ProducerPool.scala	(revision 1305462)
+++ core/src/main/scala/kafka/producer/ProducerPool.scala	(working copy)
@@ -88,6 +88,7 @@
       val iter = syncProducers.values.iterator
       while(iter.hasNext)
         iter.next.close
+      zkClient.close()
     }
   }
 }
Index: core/src/main/scala/kafka/server/KafkaServer.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision 1305462)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(working copy)
@@ -32,8 +32,8 @@
 class KafkaServer(val config: KafkaConfig) extends Logging {
 
   val CleanShutdownFile = ".kafka_cleanshutdown"
-  private val isShuttingDown = new AtomicBoolean(false)  
-  private val shutdownLatch = new CountDownLatch(1)
+  private var isShuttingDown = new AtomicBoolean(false)
+  private var shutdownLatch = new CountDownLatch(1)
   private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
@@ -47,6 +47,8 @@
    */
   def startup() {
     info("Starting Kafka server...")
+    isShuttingDown = new AtomicBoolean(false)
+    shutdownLatch = new CountDownLatch(1)
     var needRecovery = true
     val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
     if (cleanShutDownFile.exists) {
