Index: core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala	(working copy)
@@ -23,27 +23,27 @@
 import java.io.File
 import kafka.consumer.SimpleConsumer
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.TestUtils
-import kafka.utils.{Utils, Logging}
 import junit.framework.Assert._
 import kafka.api.FetchRequest
 import kafka.serializer.Encoder
-import kafka.message.{MessageSet, Message}
+import kafka.message.Message
 import kafka.producer.async.MissingConfigException
-import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.{TestUtils, Utils, Logging}
 
-class KafkaLog4jAppenderTest extends JUnitSuite with Logging {
+class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
   var logDir: File = null
-  //  var topicLogDir: File = null
   var server: KafkaServer = null
   val brokerPort: Int = 9092
   var simpleConsumer: SimpleConsumer = null
   val tLogger = Logger.getLogger(getClass())
 
   @Before
-  def setUp() {
+  override def setUp() {
+    super.setUp()
     val config: Properties = createBrokerConfig(1, brokerPort)
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
@@ -54,11 +54,12 @@
   }
 
   @After
-  def tearDown() {
+  override def tearDown() {
     simpleConsumer.close
     server.shutdown
     Thread.sleep(100)
     Utils.rm(logDir)
+    super.tearDown()
   }
 
   @Test
@@ -171,6 +172,7 @@
     props.put("log.retention.hours", "10")
     props.put("log.cleanup.interval.mins", "5")
     props.put("log.file.size", "1000")
+    props.put("zk.connect", zkConnect.toString)
     props
   }
 
Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/TestUtils.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/utils/TestUtils.scala	(working copy)
@@ -29,6 +29,7 @@
 import kafka.message._
 import org.I0Itec.zkclient.ZkClient
 import kafka.consumer.ConsumerConfig
+import kafka.cluster.Broker
 
 /**
  * Utility functions to help with testing
@@ -301,6 +302,12 @@
     }
   }
 
+  def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = {
+    val brokers = ids.map(id => new Broker(id, "localhost" + System.currentTimeMillis(), "localhost", 6667))
+    brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, b.host, b.creatorId, b.port))
+    brokers
+  }
+
 }
 
 object TestZKUtils {
Index: core/src/test/scala/unit/kafka/log/LogTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/log/LogTest.scala	(working copy)
@@ -18,14 +18,13 @@
 package kafka.log
 
 import java.io._
-import java.nio._
 import java.util.ArrayList
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.utils.{Utils, TestUtils, Range}
 import kafka.common.OffsetOutOfRangeException
-import kafka.message.{NoCompressionCodec, MessageSet, ByteBufferMessageSet, Message}
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 
 class LogTest extends JUnitSuite {
   
Index: core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogOffsetTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/log/LogOffsetTest.scala	(working copy)
@@ -25,16 +25,17 @@
 import kafka.api.{FetchRequest, OffsetRequest}
 import collection.mutable.WrappedArray
 import kafka.consumer.SimpleConsumer
-import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import org.apache.log4j._
+import kafka.zk.ZooKeeperTestHarness
+import org.scalatest.junit.JUnit3Suite
 
 object LogOffsetTest {
   val random = new Random()  
 }
 
-class LogOffsetTest extends JUnitSuite {
+class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
   var logDir: File = null
   var topicLogDir: File = null
   var server: KafkaServer = null
@@ -45,7 +46,8 @@
   private val logger = Logger.getLogger(classOf[LogOffsetTest])
   
   @Before
-  def setUp() {
+  override def setUp() {
+    super.setUp()
     val config: Properties = createBrokerConfig(1, brokerPort)
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
@@ -55,10 +57,11 @@
   }
 
   @After
-  def tearDown() {
+  override def tearDown() {
     simpleConsumer.close
     server.shutdown
     Utils.rm(logDir)
+    super.tearDown()
   }
 
   @Test
@@ -206,6 +209,7 @@
     props.put("log.retention.hours", "10")
     props.put("log.cleanup.interval.mins", "5")
     props.put("log.file.size", logSize.toString)
+    props.put("zk.connect", zkConnect.toString)
     props
   }
 
Index: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(working copy)
@@ -20,36 +20,37 @@
 import java.io._
 import junit.framework.Assert._
 import kafka.server.KafkaConfig
-import org.scalatest.junit.JUnitSuite
-import org.junit.{After, Before, Test}
-import kafka.utils.{Utils, MockTime, TestUtils}
+import org.junit.Test
 import kafka.common.OffsetOutOfRangeException
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.{TestZKUtils, Utils, MockTime, TestUtils}
+import org.scalatest.junit.JUnit3Suite
 
-class LogManagerTest extends JUnitSuite {
+class LogManagerTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   val time: MockTime = new MockTime()
   val maxLogAge = 1000
   var logDir: File = null
   var logManager: LogManager = null
   var config:KafkaConfig = null
+  val zookeeperConnect = TestZKUtils.zookeeperConnect
 
-  @Before
-  def setUp() {
+  override def setUp() {
+    super.setUp()
     val props = TestUtils.createBrokerConfig(0, -1)
     config = new KafkaConfig(props) {
                    override val logFileSize = 1024
-                   override val enableZookeeper = false
                  }
     logManager = new LogManager(config, null, time, -1, maxLogAge, false)
     logManager.startup
     logDir = logManager.logDir
   }
 
-  @After
-  def tearDown() {
+  override def tearDown() {
     if(logManager != null)
       logManager.close()
     Utils.rm(logDir)
+    super.tearDown()
   }
   
   @Test
@@ -107,7 +108,6 @@
     Thread.sleep(100)
     config = new KafkaConfig(props) {
       override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
-      override val enableZookeeper = false
       override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Int] // keep exactly 6 segments + 1 roll over
       override val logRetentionHours = retentionHours
     }
@@ -152,7 +152,6 @@
     Thread.sleep(100)
     config = new KafkaConfig(props) {
                    override val logFileSize = 1024 *1024 *1024 
-                   override val enableZookeeper = false
                    override val flushSchedulerThreadRate = 50
                    override val flushInterval = Int.MaxValue
                    override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100")
@@ -176,7 +175,6 @@
     Thread.sleep(100)
     config = new KafkaConfig(props) {
                    override val logFileSize = 256
-                   override val enableZookeeper = false
                    override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2")
                  }
     
Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(revision 0)
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.integration
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.admin.CreateTopicCommand
+import java.nio.ByteBuffer
+import kafka.log.LogManager
+import kafka.utils.TestUtils
+import kafka.server.{KafkaApis, KafkaConfig}
+import junit.framework.Assert._
+import org.I0Itec.zkclient.ZkClient
+import TestUtils._
+import org.easymock.EasyMock
+import kafka.network.BoundedByteBufferReceive
+import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
+import kafka.cluster.Broker
+
+class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
+  val props = createBrokerConfigs(1)
+  val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
+  var zkClient: ZkClient = null
+  var brokers: Seq[Broker] = null
+
+  override def setUp() {
+    super.setUp()
+    zkClient = zookeeper.client
+    // create brokers in zookeeper
+    brokers = TestUtils.createBrokersInZk(zkClient, configs.map(config => config.brokerId))
+  }
+
+  override def tearDown() {
+    super.tearDown()
+  }
+
+  def testTopicMetadataRequest {
+    // create topic
+    val topic = "test"
+    CreateTopicCommand.createTopic(zkClient, topic, 1)
+
+    // create a topic metadata request
+    val topicMetadataRequest = new TopicMetadataRequest(List(topic))
+
+    val serializedMetadataRequest = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes + 2)
+    topicMetadataRequest.writeTo(serializedMetadataRequest)
+    serializedMetadataRequest.rewind()
+    val deserializedMetadataRequest = TopicMetadataRequest.readFrom(serializedMetadataRequest)
+
+    assertEquals(topicMetadataRequest, deserializedMetadataRequest)
+  }
+
+  def testBasicTopicMetadata {
+    // create topic
+    val topic = "test"
+    CreateTopicCommand.createTopic(zkClient, topic, 1)
+
+    mockLogManagerAndTestTopic(topic)
+  }
+
+  def testAutoCreateTopic {
+    // auto create topic
+    val topic = "test"
+
+    mockLogManagerAndTestTopic(topic)
+  }
+
+  private def mockLogManagerAndTestTopic(topic: String) = {
+    // topic metadata request only requires 2 APIs from the log manager
+    val logManager = EasyMock.createMock(classOf[LogManager])
+    EasyMock.expect(logManager.getServerConfig).andReturn(configs.head)
+    EasyMock.expect(logManager.getZookeeperClient).andReturn(zkClient)
+    EasyMock.replay(logManager)
+
+    // create a topic metadata request
+    val topicMetadataRequest = new TopicMetadataRequest(List(topic))
+
+    val serializedMetadataRequest = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes + 2)
+    topicMetadataRequest.writeTo(serializedMetadataRequest)
+    serializedMetadataRequest.rewind()
+
+    // create the kafka request handler
+    val kafkaRequestHandler = new KafkaApis(logManager)
+
+    // mock the receive API to return the request buffer as created above
+    val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
+    EasyMock.expect(receivedRequest.buffer).andReturn(serializedMetadataRequest)
+    EasyMock.replay(receivedRequest)
+
+    // call the API (to be tested) to get metadata
+    val metadataResponse = kafkaRequestHandler.handleTopicMetadataRequest(receivedRequest)
+
+    // verify the topic metadata returned
+    metadataResponse match {
+      case Some(metadata) =>
+        val responseBuffer = metadata.asInstanceOf[TopicMetadataSend].metadata
+        val topicMetadata = TopicMetadataRequest.deserializeTopicsMetadataResponse(responseBuffer)
+        assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
+        assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
+        val partitionMetadata = topicMetadata.head.partitionsMetadata
+        assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+        assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+        assertEquals(brokers, partitionMetadata.head.replicas)
+        assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
+      case None =>
+        fail("Metadata response expected")
+    }
+
+    // verify the expected calls to log manager occurred in the right order
+    EasyMock.verify(logManager)
+    EasyMock.verify(receivedRequest)
+  }
+}
\ No newline at end of file
Index: core/src/test/scala/unit/kafka/integration/FetcherTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(working copy)
@@ -34,9 +34,7 @@
   val numNodes = 2
   val configs = 
     for(props <- TestUtils.createBrokerConfigs(numNodes))
-      yield new KafkaConfig(props) {
-        override val enableZookeeper = false
-      }
+      yield new KafkaConfig(props)
   val messages = new mutable.HashMap[Int, ByteBufferMessageSet]
   val topic = "topic"
   val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
Index: core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala	(working copy)
@@ -26,11 +26,10 @@
 import kafka.server._
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
-import kafka.utils.{TestUtils, TestZKUtils}
+import kafka.utils.TestUtils
 
 class AutoOffsetResetTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
-  val zkConnect = TestZKUtils.zookeeperConnect
   val topic = "test_topic"
   val group = "default_group"
   val testConsumer = "consumer"
Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(working copy)
@@ -27,9 +27,9 @@
 import java.util.Properties
 import kafka.producer.{ProducerData, Producer, ProducerConfig}
 import kafka.serializer.StringDecoder
-import kafka.utils.TestUtils
 import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet}
 import java.io.File
+import kafka.utils.TestUtils
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -39,7 +39,6 @@
   val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props) {
-                 override val enableZookeeper = false
                  override val flushInterval = 1
                }
   val configs = List(config)
Index: core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala	(working copy)
@@ -23,8 +23,7 @@
 import kafka.utils.Utils
 import kafka.api.FetchRequest
 import kafka.common.InvalidMessageSizeException
-import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.utils.TestUtils
 import kafka.consumer.{ZookeeperConsumerConnector, ConsumerConfig}
 import org.scalatest.junit.JUnit3Suite
 import kafka.integration.ProducerConsumerTestHarness
@@ -32,13 +31,11 @@
 import org.apache.log4j.{Logger, Level}
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
 
-class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness with ZooKeeperTestHarness {
-  val zkConnect = TestZKUtils.zookeeperConnect  
+class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness {
   val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
   val config = new KafkaConfig(props) {
                  override val hostName = "localhost"
-                 override val enableZookeeper = true
                }
   val configs = List(config)
   val topic = "test"
Index: core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala	(working copy)
@@ -18,25 +18,23 @@
 package kafka.integration
 
 import scala.collection._
-import junit.framework.Assert._
 import kafka.common.OffsetOutOfRangeException
 import kafka.api.{ProducerRequest, FetchRequest}
 import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
 import org.apache.log4j.{Level, Logger}
 import org.scalatest.junit.JUnit3Suite
-import kafka.utils.{TestUtils, Utils}
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.{TestUtils, Utils}
 
 /**
  * End to end tests of the primitive apis against a local server
  */
-class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness   {
+class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness  {
 
   val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
-  val config = new KafkaConfig(props) {
-                 override val enableZookeeper = false
-               }
+  val config = new KafkaConfig(props)
   val configs = List(config)
   var servers: List[KafkaServer] = null
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
Index: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala	(working copy)
@@ -17,34 +17,29 @@
 
 package kafka.integration
 
-import java.util.Properties
-import junit.framework.Assert._
-import kafka.producer._
-import kafka.consumer._
-import kafka.message._
 import kafka.server._
 import kafka.utils.{Utils, TestUtils}
 import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
 
 /**
  * A test harness that brings up some number of broker nodes
  */
-trait KafkaServerTestHarness extends JUnit3Suite {
+trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
 
   val configs: List[KafkaConfig]
   var servers: List[KafkaServer] = null
 
   override def setUp() {
+    super.setUp
     if(configs.size <= 0)
       throw new IllegalArgumentException("Must suply at least one server config.")
     servers = configs.map(TestUtils.createServer(_))
-    super.setUp
   }
 
   override def tearDown() {
-    super.tearDown
     servers.map(server => server.shutdown())
     servers.map(server => Utils.rm(server.config.logDir))
+    super.tearDown
   }
-
 }
Index: core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala	(working copy)
@@ -17,17 +17,16 @@
 
 package kafka.integration
 
-import kafka.server.{KafkaServer, KafkaConfig}
+import kafka.server.KafkaConfig
 import org.scalatest.junit.JUnit3Suite
 import org.apache.log4j.Logger
 import java.util.Properties
 import kafka.consumer.SimpleConsumer
-import kafka.utils.{Utils, TestUtils}
 import kafka.api.{OffsetRequest, FetchRequest}
 import junit.framework.Assert._
-import java.io.File
+import kafka.utils.TestUtils
 
-class BackwardsCompatibilityTest extends JUnit3Suite {
+class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness {
 
   val topic = "MagicByte0"
   val group = "default_group"
@@ -40,24 +39,19 @@
   kafkaProps.put("brokerid", "12")
   kafkaProps.put("port", port.toString)
   kafkaProps.put("log.dir", kafkaLogDir.getPath)
-  val kafkaConfig =
-    new KafkaConfig(kafkaProps) {
-      override val enableZookeeper = false
-    }
-  var kafkaServer : KafkaServer = null
+  kafkaProps.put("zk.connect", zkConnect.toString)
+  val configs = List(new KafkaConfig(kafkaProps))
   var simpleConsumer: SimpleConsumer = null
 
   private val logger = Logger.getLogger(getClass())
 
   override def setUp() {
     super.setUp()
-    kafkaServer = TestUtils.createServer(kafkaConfig)
     simpleConsumer = new SimpleConsumer(host, port, 1000000, 64*1024)
   }
 
   override def tearDown() {
     simpleConsumer.close
-    kafkaServer.shutdown
     super.tearDown
   }
 
Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(working copy)
@@ -17,38 +17,23 @@
 
 package kafka.producer
 
-import junit.framework.{Assert, TestCase}
-import kafka.utils.SystemTime
-import kafka.utils.TestUtils
-import kafka.server.{KafkaServer, KafkaConfig}
-import org.apache.log4j.{Logger, Level}
-import org.scalatest.junit.JUnitSuite
-import org.junit.{After, Before, Test}
+import junit.framework.Assert
+import kafka.server.KafkaConfig
 import kafka.common.MessageSizeTooLargeException
 import java.util.Properties
 import kafka.api.ProducerRequest
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.integration.KafkaServerTestHarness
+import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
+import org.scalatest.junit.JUnit3Suite
 
-class SyncProducerTest extends JUnitSuite {
+class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   private var messageBytes =  new Array[Byte](2);
-  private var server: KafkaServer = null
+  val configs = List(new KafkaConfig(TestUtils.createBrokerConfigs(1).head))
+  val zookeeperConnect = TestZKUtils.zookeeperConnect
 
-  @Before
-  def setUp() {
-    server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.choosePort))
-    {
-      override val enableZookeeper = false
-    })
-  }
-
-  @After
-  def tearDown() {
-    if(server != null)
-      server.shutdown
-  }
-
-  @Test
   def testReachableServer() {
+    val server = servers.head
     val props = new Properties()
     props.put("host", "localhost")
     props.put("port", server.socketServer.port.toString)
@@ -85,8 +70,8 @@
     Assert.assertFalse(failed)
   }
 
-  @Test
   def testMessageSizeTooLarge() {
+    val server = servers.head
     val props = new Properties()
     props.put("host", "localhost")
     props.put("port", server.socketServer.port.toString)
Index: core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala	(working copy)
@@ -17,7 +17,7 @@
 
 package kafka.producer
 
-import junit.framework.{Assert, TestCase}
+import junit.framework.Assert
 import java.util.Properties
 import org.easymock.EasyMock
 import kafka.api.ProducerRequest
Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(working copy)
@@ -17,14 +17,13 @@
 
 package kafka.producer
 
-import async.{AsyncProducerConfig, AsyncProducer}
+import async.AsyncProducer
 import java.util.Properties
 import org.apache.log4j.{Logger, Level}
 import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
 import kafka.zk.EmbeddedZookeeper
 import org.junit.{After, Before, Test}
 import junit.framework.Assert
-import collection.mutable.HashMap
 import org.easymock.EasyMock
 import java.util.concurrent.ConcurrentHashMap
 import kafka.cluster.Partition
Index: core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala	(working copy)
@@ -18,12 +18,9 @@
 package kafka.message
 
 import java.nio._
-import java.util.Arrays
-import junit.framework.TestCase
 import junit.framework.Assert._
 import kafka.utils.TestUtils._
 import org.junit.Test
-import kafka.message._
 
 class FileMessageSetTest extends BaseMessageSetTestCases {
   
Index: core/src/test/scala/unit/kafka/message/MessageTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/message/MessageTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/message/MessageTest.scala	(working copy)
@@ -19,9 +19,6 @@
 
 import java.util._
 import java.nio._
-import java.nio.channels._
-import java.io._
-import junit.framework.TestCase
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{Before, Test}
Index: core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
===================================================================
--- core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka.message
 
-import java.util.Arrays
 import junit.framework.Assert._
 import kafka.utils.TestUtils._
 import org.scalatest.junit.JUnitSuite
Index: core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala	(working copy)
@@ -20,12 +20,11 @@
 import kafka.consumer.ConsumerConfig
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.{ZkUtils, ZKStringSerializer}
-import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.utils.TestUtils
 import org.junit.Assert
 import org.scalatest.junit.JUnit3Suite
 
 class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
-  val zkConnect = TestZKUtils.zookeeperConnect
   var zkSessionTimeoutMs = 1000
 
   def testEphemeralNodeCleanup = {
Index: core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
===================================================================
--- core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala	(working copy)
@@ -18,9 +18,10 @@
 package kafka.zk
 
 import org.scalatest.junit.JUnit3Suite
+import kafka.utils.TestZKUtils
 
 trait ZooKeeperTestHarness extends JUnit3Suite {
-  val zkConnect: String
+  val zkConnect: String = TestZKUtils.zookeeperConnect
   var zookeeper: EmbeddedZookeeper = null
 
   override def setUp() {
Index: core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala	(working copy)
@@ -22,10 +22,9 @@
 import kafka.consumer.{ConsumerConfig, ZookeeperConsumerConnector}
 import java.lang.Thread
 import org.scalatest.junit.JUnit3Suite
-import kafka.utils.{TestUtils, ZkUtils, ZKGroupTopicDirs, TestZKUtils}
+import kafka.utils.{TestUtils, ZkUtils, ZKGroupTopicDirs}
 
 class ZKLoadBalanceTest extends JUnit3Suite with ZooKeeperTestHarness {
-  val zkConnect = TestZKUtils.zookeeperConnect
   var dirs : ZKGroupTopicDirs = null
   val topic = "topic1"
   val group = "group1"
Index: core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala	(working copy)
@@ -16,29 +16,25 @@
  */
 package kafka.server
 
-import kafka.utils.TestUtils
 import java.io.File
-import kafka.utils.Utils
 import kafka.api.FetchRequest
-import kafka.integration.ProducerConsumerTestHarness
 import kafka.producer.{SyncProducer, SyncProducerConfig}
 import kafka.consumer.SimpleConsumer
 import java.util.Properties
-import org.scalatest.junit.JUnitSuite
-import junit.framework.{Assert, TestCase}
-import org.junit.{After, Before, Test}
+import org.junit.Test
 import junit.framework.Assert._
 import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.{TestUtils, Utils}
 
-class ServerShutdownTest extends JUnitSuite {
+class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
   val port = TestUtils.choosePort
 
   @Test
   def testCleanShutdown() {
     val props = TestUtils.createBrokerConfig(0, port)
-    val config = new KafkaConfig(props) {
-      override val enableZookeeper = false
-    }
+    val config = new KafkaConfig(props)
 
     val host = "localhost"
     val topic = "test"
Index: core/src/test/scala/unit/kafka/admin/AdminTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/admin/AdminTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/admin/AdminTest.scala	(working copy)
@@ -20,18 +20,17 @@
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.TestZKUtils
+import kafka.utils.TestUtils
 
 class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
-  val zkConnect = TestZKUtils.zookeeperConnect
- 
+
   @Test
   def testReplicaAssignment() {
     val brokerList = List("0", "1", "2", "3", "4")
 
     // test 0 replication factor
     try {
-      AdminUtils.assginReplicasToBrokers(brokerList, 10, 0)
+      AdminUtils.assignReplicasToBrokers(brokerList, 10, 0)
       fail("shouldn't allow replication factor 0")
     }
     catch {
@@ -41,7 +40,7 @@
 
     // test wrong replication factor
     try {
-      AdminUtils.assginReplicasToBrokers(brokerList, 10, 6)
+      AdminUtils.assignReplicasToBrokers(brokerList, 10, 6)
       fail("shouldn't allow replication factor larger than # of brokers")
     }
     catch {
@@ -64,7 +63,7 @@
         List("4", "1", "2")
         )
 
-      val actualAssignment = AdminUtils.assginReplicasToBrokers(brokerList, 10, 3, 0)
+      val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
       val e = (expectedAssignment.toList == actualAssignment.toList)
       assertTrue(expectedAssignment.toList == actualAssignment.toList)
     }
@@ -121,7 +120,7 @@
 
   @Test
   def testTopicCreationInZK() {
-    val expectedReplicationAssignment = Array(
+    val expectedReplicaAssignment = Array(
       List("0", "1", "2"),
       List("1", "2", "3"),
       List("2", "3", "4"),
@@ -135,18 +134,46 @@
       List("1", "2", "3"),
       List("1", "3", "4")      
       )
+    TestUtils.createBrokersInZk(zookeeper.client, List(0, 1, 2, 3, 4))
+
     val topic = "test"
-    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicationAssignment, zookeeper.client)
-    val actualReplicationAssignment = AdminUtils.getTopicMetaDataFromZK(topic, zookeeper.client).get.map(p => p.replicaList)
-    assertTrue(expectedReplicationAssignment.toList == actualReplicationAssignment.toList)
+    // create the topic
+    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
+    val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zookeeper.client).head
+                                  .get.partitionsMetadata.map(p => p.replicas)
+    val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
+    expectedReplicaAssignment.toList.zip(actualReplicaList).foreach(l => assertEquals(l._1, l._2))
 
     try {
-      AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicationAssignment, zookeeper.client)
-      fail("shouldn't be able to create a topic already exist")
+      AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
+      fail("shouldn't be able to create a topic already exists")
     }
     catch {
       case e: AdministrationException => // this is good
       case e2 => throw e2
     }
   }
+
+  @Test
+  def testGetTopicMetadata() {
+    val expectedReplicaAssignment = Array(
+      List("0", "1", "2"),
+      List("1", "2", "3")
+    )
+    val topic = "auto-topic"
+    TestUtils.createBrokersInZk(zookeeper.client, List(0, 1, 2, 3))
+    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zookeeper.client)
+
+    val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zookeeper.client).head
+    newTopicMetadata match {
+      case Some(metadata) => assertEquals(topic, metadata.topic)
+        assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)
+        assertEquals("partition metadata list length should be 2", 2, metadata.partitionsMetadata.size)
+        assertNull("leader should not be assigned for now", metadata.partitionsMetadata.head.leader.getOrElse(null))
+        val actualReplicaAssignment = metadata.partitionsMetadata.map(p => p.replicas)
+        val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
+        assertEquals(expectedReplicaAssignment.toList, actualReplicaList)
+      case None => fail("Topic " + topic + " should've been automatically created")
+    }
+  }
 }
Index: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/network/SocketServerTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/network/SocketServerTest.scala	(working copy)
@@ -19,15 +19,11 @@
 
 import java.net._
 import java.io._
-import java.nio._
-import java.nio.channels._
 import org.junit._
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import kafka.utils.TestUtils
-import kafka.network._
 import java.util.Random
-import org.apache.log4j._
 
 class SocketServerTest extends JUnitSuite {
 
Index: core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala	(working copy)
@@ -19,7 +19,6 @@
 package kafka.consumer
 
 import junit.framework.Assert._
-import kafka.zk.ZooKeeperTestHarness
 import kafka.integration.KafkaServerTestHarness
 import kafka.server._
 import scala.collection._
@@ -30,19 +29,17 @@
 import kafka.message._
 import kafka.serializer.StringDecoder
 
-class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
+class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
   val zookeeperConnect = TestZKUtils.zookeeperConnect
-  val zkConnect = zookeeperConnect
   val numNodes = 2
   val numParts = 2
   val topic = "topic1"
   val configs =
     for(props <- TestUtils.createBrokerConfigs(numNodes))
     yield new KafkaConfig(props) {
-      override val enableZookeeper = true
-      override val numPartitions = numParts
       override val zkConnect = zookeeperConnect
+      override val numPartitions = numParts
     }
   val group = "group1"
   val consumer0 = "consumer0"
Index: core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala	(working copy)
@@ -25,8 +25,8 @@
 import org.scalatest.junit.JUnit3Suite
 import kafka.javaapi.message.ByteBufferMessageSet
 import kafka.javaapi.ProducerRequest
-import kafka.utils.TestUtils
 import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message}
+import kafka.utils.TestUtils
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -35,9 +35,7 @@
   
   val port = 9999
   val props = TestUtils.createBrokerConfig(0, port)
-  val config = new KafkaConfig(props) {
-                 override val enableZookeeper = false
-               }
+  val config = new KafkaConfig(props)
   val configs = List(config)
   val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler])
 
Index: core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala	(working copy)
@@ -17,42 +17,28 @@
 
 package kafka.javaapi.producer
 
-import junit.framework.{Assert, TestCase}
-import kafka.utils.SystemTime
-import kafka.utils.TestUtils
-import kafka.server.{KafkaServer, KafkaConfig}
-import org.apache.log4j.{Logger, Level}
-import org.scalatest.junit.JUnitSuite
-import org.junit.{After, Before, Test}
+import junit.framework.Assert
+import kafka.server.KafkaConfig
+import org.apache.log4j.Logger
 import java.util.Properties
 import kafka.producer.SyncProducerConfig
 import kafka.javaapi.message.ByteBufferMessageSet
 import kafka.javaapi.ProducerRequest
 import kafka.message.{NoCompressionCodec, Message}
+import kafka.integration.KafkaServerTestHarness
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.{SystemTime, TestUtils}
 
-class SyncProducerTest extends JUnitSuite {
+class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   private var messageBytes =  new Array[Byte](2);
-  private var server: KafkaServer = null
   val simpleProducerLogger = Logger.getLogger(classOf[kafka.producer.SyncProducer])
+  val configs = List(new KafkaConfig(TestUtils.createBrokerConfigs(1).head))
+  val zookeeperConnect = zkConnect
 
-  @Before
-  def setUp() {
-    server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0, 9092))
-    {
-      override val enableZookeeper = false
-    })
-  }
-
-  @After
-  def tearDown() {
-    server.shutdown
-  }
-
-  @Test
   def testReachableServer() {
     val props = new Properties()
     props.put("host", "localhost")
-    props.put("port", "9092")
+    props.put("port", servers.head.socketServer.port.toString)
     props.put("buffer.size", "102400")
     props.put("connect.timeout.ms", "500")
     props.put("reconnect.interval", "1000")
Index: core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala	(working copy)
@@ -21,10 +21,8 @@
 import org.apache.log4j.{Logger, Level}
 import kafka.server.{KafkaRequestHandler, KafkaServer, KafkaConfig}
 import kafka.zk.EmbeddedZookeeper
-import kafka.utils.{TestZKUtils, TestUtils}
 import org.junit.{After, Before, Test}
 import junit.framework.Assert
-import collection.mutable.HashMap
 import org.easymock.EasyMock
 import kafka.utils._
 import java.util.concurrent.ConcurrentHashMap
@@ -34,7 +32,7 @@
 import kafka.producer.{SyncProducerConfig, Partitioner, ProducerConfig, DefaultPartitioner}
 import kafka.producer.ProducerPool
 import kafka.javaapi.message.ByteBufferMessageSet
-import kafka.producer.async.{AsyncProducer, AsyncProducerConfig}
+import kafka.producer.async.AsyncProducer
 import kafka.javaapi.Implicits._
 import kafka.serializer.{StringEncoder, Encoder}
 import kafka.javaapi.consumer.SimpleConsumer
Index: core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala	(working copy)
@@ -18,7 +18,6 @@
 package kafka.javaapi.message
 
 import java.nio._
-import junit.framework.TestCase
 import junit.framework.Assert._
 import org.junit.Test
 import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}
Index: core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala	(working copy)
@@ -35,7 +35,6 @@
 
   @Test
   def testWrittenEqualsRead {
-    import scala.collection.JavaConversions._
     val messageSet = createMessageSet(messages)
     TestUtils.checkEquals(messages.iterator, toMessageIterator(messageSet))
   }
Index: core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala	(revision 1234984)
+++ core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala	(working copy)
@@ -18,11 +18,10 @@
 package kafka.javaapi.consumer
 
 import junit.framework.Assert._
-import kafka.zk.ZooKeeperTestHarness
 import kafka.integration.KafkaServerTestHarness
 import kafka.server._
 import kafka.utils.{Utils, Logging}
-import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.utils.TestUtils
 import org.scalatest.junit.JUnit3Suite
 import scala.collection.JavaConversions._
 import kafka.javaapi.message.ByteBufferMessageSet
@@ -30,17 +29,15 @@
 import org.apache.log4j.{Level, Logger}
 import kafka.message._
 
-class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness with Logging {
+class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
-  val zookeeperConnect = TestZKUtils.zookeeperConnect
-  val zkConnect = zookeeperConnect
+  val zookeeperConnect = zkConnect
   val numNodes = 2
   val numParts = 2
   val topic = "topic1"
   val configs =
     for(props <- TestUtils.createBrokerConfigs(numNodes))
     yield new KafkaConfig(props) {
-      override val enableZookeeper = true
       override val numPartitions = numParts
       override val zkConnect = zookeeperConnect
     }
@@ -57,7 +54,7 @@
     val sentMessages1 = sendMessages(nMessages, "batch1")
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+      TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
Index: core/src/test/scala/other/kafka/TestLogPerformance.scala
===================================================================
--- core/src/test/scala/other/kafka/TestLogPerformance.scala	(revision 1234984)
+++ core/src/test/scala/other/kafka/TestLogPerformance.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka.log
 
-import kafka.log._
 import kafka.message._
 import kafka.utils.{TestUtils, Utils}
 
Index: core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
===================================================================
--- core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala	(revision 1234984)
+++ core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala	(working copy)
@@ -17,8 +17,6 @@
 
 package kafka
 
-import java.net.URI
-import java.util.Arrays.asList
 import java.io._
 import java.nio._
 import java.nio.channels._
Index: core/src/test/scala/other/kafka/TestKafkaAppender.scala
===================================================================
--- core/src/test/scala/other/kafka/TestKafkaAppender.scala	(revision 1234984)
+++ core/src/test/scala/other/kafka/TestKafkaAppender.scala	(working copy)
@@ -18,7 +18,7 @@
 package kafka
 
 import message.Message
-import org.apache.log4j.{Logger, PropertyConfigurator}
+import org.apache.log4j.PropertyConfigurator
 import kafka.utils.Logging
 import serializer.Encoder
 
Index: core/src/test/resources/log4j.properties
===================================================================
--- core/src/test/resources/log4j.properties	(revision 1234984)
+++ core/src/test/resources/log4j.properties	(working copy)
@@ -12,13 +12,13 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-log4j.rootLogger=WARN, stdout
+log4j.rootLogger=OFF, stdout
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
-log4j.logger.kafka=WARN
+log4j.logger.kafka=OFF
 
 # zkclient can be verbose, during debugging it is common to adjust is separately
-log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
\ No newline at end of file
+log4j.logger.org.I0Itec.zkclient.ZkClient=OFF
Index: core/src/main/scala/kafka/cluster/Cluster.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Cluster.scala	(revision 1234984)
+++ core/src/main/scala/kafka/cluster/Cluster.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka.cluster
 
-import kafka.utils._
 import scala.collection._
 
 /**
Index: core/src/main/scala/kafka/cluster/Broker.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Broker.scala	(revision 1234984)
+++ core/src/main/scala/kafka/cluster/Broker.scala	(working copy)
@@ -17,28 +17,49 @@
 
 package kafka.cluster
 
-import java.util.Arrays
-import kafka.utils._
-import java.net.InetAddress
-import kafka.server.KafkaConfig
-import util.parsing.json.JSON
+import kafka.utils.Utils._
+import java.nio.ByteBuffer
 
 /**
  * A Kafka broker
  */
 private[kafka] object Broker {
+
   def createBroker(id: Int, brokerInfoString: String): Broker = {
+    if(brokerInfoString == null)
+      throw new IllegalArgumentException("Broker id %s does not exist".format(id))
     val brokerInfo = brokerInfoString.split(":")
     new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt)
   }
+
+  def readFrom(buffer: ByteBuffer): Broker = {
+    val id = buffer.getInt
+    val creatorId = readShortString(buffer)
+    val host = readShortString(buffer)
+    val port = buffer.getInt
+    new Broker(id, creatorId, host, port)
+  }
 }
 
-private[kafka] class Broker(val id: Int, val creatorId: String, val host: String, val port: Int) {
+private[kafka] case class Broker(val id: Int, val creatorId: String, val host: String, val port: Int) {
   
   override def toString(): String = new String("id:" + id + ",creatorId:" + creatorId + ",host:" + host + ",port:" + port)
 
   def getZKString(): String = new String(creatorId + ":" + host + ":" + port)
-  
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(id)
+    writeShortString(buffer, creatorId)
+    writeShortString(buffer, host)
+    buffer.putInt(port)
+  }
+
+  def sizeInBytes: Int = {
+    val size = shortStringLength(creatorId) + shortStringLength(host) /* host name */ + 4 /* port */ + 4 /* broker id*/
+    debug("Size of broker info = " + size)
+    size
+  }
+
   override def equals(obj: Any): Boolean = {
     obj match {
       case null => false
@@ -47,6 +68,6 @@
     }
   }
   
-  override def hashCode(): Int = Utils.hashcode(id, host, port)
+  override def hashCode(): Int = hashcode(id, host, port)
   
 }
Index: core/src/main/scala/kafka/log/LogManager.scala
===================================================================
--- core/src/main/scala/kafka/log/LogManager.scala	(revision 1234984)
+++ core/src/main/scala/kafka/log/LogManager.scala	(working copy)
@@ -25,6 +25,7 @@
 import kafka.server.{KafkaConfig, KafkaZooKeeper}
 import kafka.common.{InvalidTopicException, InvalidPartitionException}
 import kafka.api.OffsetRequest
+import org.I0Itec.zkclient.ZkClient
 
 /**
  * The guy who creates and hands out logs
@@ -38,15 +39,16 @@
                                 needRecovery: Boolean) extends Logging {
   
   val logDir: File = new File(config.logDir)
+  var kafkaZookeeper = new KafkaZooKeeper(config, this)
+
   private val numPartitions = config.numPartitions
   private val maxSize: Long = config.logFileSize
   private val flushInterval = config.flushInterval
   private val topicPartitionsMap = config.topicPartitionsMap
   private val logCreationLock = new Object
   private val random = new java.util.Random
-  private var kafkaZookeeper: KafkaZooKeeper = null
   private var zkActor: Actor = null
-  private val startupLatch: CountDownLatch = if (config.enableZookeeper) new CountDownLatch(1) else null
+  private val startupLatch: CountDownLatch = new CountDownLatch(1)
   private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
   private val logFlushIntervalMap = config.flushIntervalMap
   private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
@@ -82,29 +84,26 @@
     scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
   }
 
-  if(config.enableZookeeper) {
-    kafkaZookeeper = new KafkaZooKeeper(config, this)
-    kafkaZookeeper.startup
-    zkActor = new Actor {
-      def act() {
-        loop {
-          receive {
-            case topic: String =>
-              try {
-                kafkaZookeeper.registerTopicInZk(topic)
-              }
-              catch {
-                case e => error(e) // log it and let it go
-              }
-            case StopActor =>
-              info("zkActor stopped")
-              exit
-          }
+  kafkaZookeeper.startup
+  zkActor = new Actor {
+    def act() {
+      loop {
+        receive {
+          case topic: String =>
+            try {
+              kafkaZookeeper.registerTopicInZk(topic)
+            }
+            catch {
+              case e => error(e) // log it and let it go
+            }
+          case StopActor =>
+            info("zkActor stopped")
+            exit
         }
       }
     }
-    zkActor.start
   }
+  zkActor.start
 
   case object StopActor
 
@@ -119,24 +118,20 @@
    *  Register this broker in ZK for the first time.
    */
   def startup() {
-    if(config.enableZookeeper) {
-      kafkaZookeeper.registerBrokerInZk()
-      for (topic <- getAllTopics)
-        kafkaZookeeper.registerTopicInZk(topic)
-      startupLatch.countDown
-    }
+    kafkaZookeeper.registerBrokerInZk()
+    for (topic <- getAllTopics)
+      kafkaZookeeper.registerTopicInZk(topic)
+    startupLatch.countDown
     info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervalMap)
     logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate)
   }
 
   private def awaitStartup() {
-    if (config.enableZookeeper)
-      startupLatch.await
+    startupLatch.await
   }
 
   private def registerNewTopicInZK(topic: String) {
-    if (config.enableZookeeper)
-      zkActor ! topic 
+    zkActor ! topic
   }
 
   /**
@@ -288,10 +283,8 @@
     val iter = getLogIterator
     while(iter.hasNext)
       iter.next.close()
-    if (config.enableZookeeper) {
-      zkActor ! StopActor
-      kafkaZookeeper.close
-    }
+    zkActor ! StopActor
+    kafkaZookeeper.close
   }
   
   private def getLogIterator(): Iterator[Log] = {
@@ -345,4 +338,7 @@
 
   def getAllTopics(): Iterator[String] = logs.keys.iterator
   def getTopicPartitionsMap() = topicPartitionsMap
+
+  def getServerConfig: KafkaConfig = config
+  def getZookeeperClient: ZkClient = kafkaZookeeper.zkClient
 }
Index: core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
===================================================================
--- core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala	(revision 1234984)
+++ core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala	(working copy)
@@ -18,7 +18,7 @@
 
 import kafka.utils.{ZKStringSerializer, ZkUtils, ZKConfig}
 import collection.mutable.HashMap
-import collection.mutable.Map
+import collection.immutable.Map
 import kafka.utils.Logging
 import collection.immutable.TreeSet
 import kafka.cluster.{Broker, Partition}
@@ -188,13 +188,9 @@
    * @return a mapping from brokerId to (host, port)
    */
   private def getZKBrokerInfo(): Map[Int, Broker] = {
-    val brokers = new HashMap[Int, Broker]()
     val allBrokerIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath).map(bid => bid.toInt)
-    allBrokerIds.foreach { bid =>
-      val brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)
-      brokers += (bid -> Broker.createBroker(bid, brokerInfo))
-    }
-    brokers
+    val brokers = ZkUtils.getBrokerInfoFromIds(zkClient, allBrokerIds)
+    allBrokerIds.zip(brokers).toMap
   }
 
   /**
Index: core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
===================================================================
--- core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala	(revision 1234984)
+++ core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala	(working copy)
@@ -17,7 +17,7 @@
 package kafka.producer
 
 import collection.mutable.HashMap
-import collection.mutable.Map
+import collection.immutable.Map
 import collection.SortedSet
 import kafka.cluster.{Broker, Partition}
 import kafka.common.InvalidConfigException
@@ -90,7 +90,7 @@
       brokerInfo += (brokerIdHostPort(0).toInt -> new Broker(brokerIdHostPort(0).toInt, brokerIdHostPort(1),
         brokerIdHostPort(1), brokerIdHostPort(2).toInt))
     }
-    brokerInfo
+    brokerInfo.toMap
   }
 
 }
Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/ConsoleProducer.scala	(revision 1234984)
+++ core/src/main/scala/kafka/producer/ConsoleProducer.scala	(working copy)
@@ -18,14 +18,10 @@
 package kafka.producer
 
 import scala.collection.JavaConversions._
-import org.I0Itec.zkclient._
 import joptsimple._
-import java.util.Arrays.asList
 import java.util.Properties
-import java.util.Random
 import java.io._
 import kafka.message._
-import kafka.utils._
 import kafka.serializer._
 
 object ConsoleProducer { 
Index: core/src/main/scala/kafka/producer/SyncProducerConfig.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducerConfig.scala	(revision 1234984)
+++ core/src/main/scala/kafka/producer/SyncProducerConfig.scala	(working copy)
@@ -19,7 +19,6 @@
 
 import kafka.utils.Utils
 import java.util.Properties
-import kafka.message.{CompressionUtils, CompressionCodec}
 
 class SyncProducerConfig(val props: Properties) extends SyncProducerConfigShared {
   /** the broker to which the producer sends events */
Index: core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
===================================================================
--- core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala	(revision 1234984)
+++ core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala	(working copy)
@@ -16,7 +16,7 @@
 */
 package kafka.producer
 
-import collection.mutable.Map
+import collection.immutable.Map
 import collection.SortedSet
 import kafka.cluster.{Broker, Partition}
 
Index: core/src/main/scala/kafka/producer/async/AsyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/AsyncProducer.scala	(revision 1234984)
+++ core/src/main/scala/kafka/producer/async/AsyncProducer.scala	(working copy)
@@ -106,29 +106,26 @@
 
     if(!added) {
       AsyncProducerStats.recordDroppedEvents
-      logger.error("Event queue is full of unsent messages, could not send event: " + event.toString)
+      error("Event queue is full of unsent messages, could not send event: " + event.toString)
       throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + event.toString)
     }else {
-      if(logger.isTraceEnabled) {
-        logger.trace("Added event to send queue for topic: " + topic + ", partition: " + partition + ":" + event.toString)
-        logger.trace("Remaining queue size: " + queue.remainingCapacity)
-      }
+      trace("Added event to send queue for topic: " + topic + ", partition: " + partition + ":" + event.toString)
+      trace("Remaining queue size: " + queue.remainingCapacity)
     }
   }
 
   def close = {
     if(cbkHandler != null) {
       cbkHandler.close
-      logger.info("Closed the callback handler")
+      info("Closed the callback handler")
     }
     closed.set(true)
     queue.put(new QueueItem(AsyncProducer.Shutdown.asInstanceOf[T], null, -1))
-    if(logger.isDebugEnabled)
-      logger.debug("Added shutdown command to the queue")
+    debug("Added shutdown command to the queue")
     sendThread.shutdown
     sendThread.awaitShutdown
     producer.close
-    logger.info("Closed AsyncProducer")
+   info("Closed AsyncProducer")
   }
 
   // for testing only
Index: core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
===================================================================
--- core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala	(revision 1234984)
+++ core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala	(working copy)
@@ -19,7 +19,7 @@
 
 import async.MissingConfigException
 import org.apache.log4j.spi.LoggingEvent
-import org.apache.log4j.{Logger, AppenderSkeleton}
+import org.apache.log4j.AppenderSkeleton
 import kafka.utils.{Utils, Logging}
 import kafka.serializer.Encoder
 import java.util.{Properties, Date}
Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(revision 1234984)
+++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka.message
 
-import scala.collection.mutable
 import kafka.utils.Logging
 import kafka.common.{InvalidMessageSizeException, ErrorMapping}
 import java.nio.ByteBuffer
Index: core/src/main/scala/kafka/message/FileMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/FileMessageSet.scala	(revision 1234984)
+++ core/src/main/scala/kafka/message/FileMessageSet.scala	(working copy)
@@ -22,8 +22,6 @@
 import java.nio.channels._
 import java.util.concurrent.atomic._
 
-import kafka._
-import kafka.message._
 import kafka.utils._
 
 /**
Index: core/src/main/scala/kafka/message/CompressionUtils.scala
===================================================================
--- core/src/main/scala/kafka/message/CompressionUtils.scala	(revision 1234984)
+++ core/src/main/scala/kafka/message/CompressionUtils.scala	(working copy)
@@ -54,8 +54,8 @@
 }
 
 class SnappyCompression(inputStream: InputStream,outputStream: ByteArrayOutputStream)  extends CompressionFacade(inputStream,outputStream) {
-  import org.xerial.snappy.{SnappyInputStream}
-  import org.xerial.snappy.{SnappyOutputStream}
+  import org.xerial.snappy.SnappyInputStream
+  import org.xerial.snappy.SnappyOutputStream
   
   val snappyIn:SnappyInputStream = if (inputStream == null) null else new SnappyInputStream(inputStream)
   val snappyOut:SnappyOutputStream = if (outputStream == null) null else new  SnappyOutputStream(outputStream)
Index: core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
===================================================================
--- core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala	(revision 1234984)
+++ core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala	(working copy)
@@ -19,7 +19,6 @@
 
 import java.io.InputStream
 import java.nio.ByteBuffer
-import scala.Math
 
 class ByteBufferBackedInputStream(buffer:ByteBuffer) extends InputStream {
   override def read():Int  = {
Index: core/src/main/scala/kafka/message/Message.scala
===================================================================
--- core/src/main/scala/kafka/message/Message.scala	(revision 1234984)
+++ core/src/main/scala/kafka/message/Message.scala	(working copy)
@@ -18,9 +18,6 @@
 package kafka.message
 
 import java.nio._
-import java.nio.channels._
-import java.util.zip.CRC32
-import java.util.UUID
 import kafka.utils._
 import kafka.common.UnknownMagicByteException
 
Index: core/src/main/scala/kafka/common/ErrorMapping.scala
===================================================================
--- core/src/main/scala/kafka/common/ErrorMapping.scala	(revision 1234984)
+++ core/src/main/scala/kafka/common/ErrorMapping.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka.common
 
-import kafka.consumer._
 import kafka.message.InvalidMessageException
 import java.nio.ByteBuffer
 import java.lang.Throwable
Index: core/src/main/scala/kafka/common/KafkaZookeperClient.scala
===================================================================
--- core/src/main/scala/kafka/common/KafkaZookeperClient.scala	(revision 0)
+++ core/src/main/scala/kafka/common/KafkaZookeperClient.scala	(revision 0)
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.common
+
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ZKStringSerializer, ZKConfig}
+import java.util.concurrent.atomic.AtomicReference
+
+object KafkaZookeeperClient {
+  private val INSTANCE = new AtomicReference[ZkClient](null)
+
+  def getZookeeperClient(config: ZKConfig): ZkClient = {
+    // TODO: This cannot be a singleton since unit tests break if we do that
+//    INSTANCE.compareAndSet(null, new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+//                                              ZKStringSerializer))
+    INSTANCE.set(new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+                                              ZKStringSerializer))
+    INSTANCE.get()
+  }
+}
Index: core/src/main/scala/kafka/admin/CreateTopicCommand.scala
===================================================================
--- core/src/main/scala/kafka/admin/CreateTopicCommand.scala	(revision 1234984)
+++ core/src/main/scala/kafka/admin/CreateTopicCommand.scala	(working copy)
@@ -44,9 +44,10 @@
                            .describedAs("replication factor")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(1)
-    val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "for manuallly assigning replicas to brokers")
+    val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "for manually assigning replicas to brokers")
                            .withRequiredArg
-                           .describedAs("broker_id_for_part1_replica1:broker_id_for_part1_replica2,broker_id_for_part2_replica1:broker_id_for_part2_replica2, ...")
+                           .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , " +
+                                        "broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...")
                            .ofType(classOf[String])
                            .defaultsTo("")
 
@@ -68,14 +69,7 @@
     var zkClient: ZkClient = null
     try {
       zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-      val brokerList = ZkUtils.getSortedBrokerList(zkClient)
-      var replicaAssignment: Seq[List[String]] = null
-
-      if (replicaAssignmentStr == "")
-        replicaAssignment = AdminUtils.assginReplicasToBrokers(brokerList, nPartitions, replicationFactor)
-      else
-        replicaAssignment = getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
-      AdminUtils.createReplicaAssignmentPathInZK(topic, replicaAssignment, zkClient)
+      createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
       println("creation succeeded!")
     }
     catch {
@@ -89,11 +83,22 @@
     }
   }
 
+  def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
+    val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+    var replicaAssignment: Seq[List[String]] = null
+
+    if (replicaAssignmentStr == "")
+      replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
+    else
+      replicaAssignment = getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
+    AdminUtils.createReplicaAssignmentPathInZK(topic, replicaAssignment, zkClient)
+  }
+
   def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[String]): Array[List[String]] = {
     val partitionList = replicaAssignmentList.split(",")
     val ret = new Array[List[String]](partitionList.size)
     for (i <- 0 until partitionList.size) {
-      val brokerList = partitionList(i).split(":")
+      val brokerList = partitionList(i).split(":").map(s => s.trim())
       if (brokerList.size <= 0)
         throw new AdministrationException("replication factor must be larger than 0")
       if (brokerList.size != brokerList.toSet.size)
Index: core/src/main/scala/kafka/admin/AdminUtils.scala
===================================================================
--- core/src/main/scala/kafka/admin/AdminUtils.scala	(revision 1234984)
+++ core/src/main/scala/kafka/admin/AdminUtils.scala	(working copy)
@@ -20,9 +20,12 @@
 import java.util.Random
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import kafka.utils.{SystemTime, Utils, ZkUtils}
+import kafka.api.{TopicMetadata, PartitionMetadata}
+import kafka.utils.{Logging, SystemTime, Utils, ZkUtils}
+import kafka.cluster.Broker
+import collection.mutable.HashMap
 
-object AdminUtils {
+object AdminUtils extends Logging {
   val rand = new Random
 
   /**
@@ -43,7 +46,7 @@
    * p3        p4        p0        p1        p2       (3nd replica)
    * p7        p8        p9        p5        p6       (3nd replica)
    */
-  def assginReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
+  def assignReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
           fixedStartIndex: Int = -1)  // for testing only
     : Array[List[String]] = {
     if (nPartitions <= 0)
@@ -75,8 +78,9 @@
       val topicVersion = SystemTime.milliseconds
       ZkUtils.createPersistentPath(zkClient, ZkUtils.BrokerTopicsPath + "/" + topic, topicVersion.toString)
       for (i <- 0 until replicaAssignmentList.size) {
-        val zkPath = ZkUtils.getTopicPartReplicasPath(topic, i.toString)
+        val zkPath = ZkUtils.getTopicPartitionReplicasPath(topic, i.toString)
         ZkUtils.updatePersistentPath(zkClient, zkPath, Utils.seqToCSV(replicaAssignmentList(i)))
+        debug("Updated path %s with %s for replica assignment".format(zkPath, Utils.seqToCSV(replicaAssignmentList(i))))
       }
     }
     catch {
@@ -88,26 +92,50 @@
     }
   }
 
-  def getTopicMetaDataFromZK(topic: String, zkClient: ZkClient): Option[Seq[PartitionMetaData]] = {
-    if (!ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
-      return None
-    
-    val topicPartitionsPath = ZkUtils.getTopicPartsPath(topic)
-    val partitions = ZkUtils.getChildrenParentMayNotExist(zkClient, topicPartitionsPath).sortWith((s,t) => s.toInt < t.toInt)
-    val ret = new Array[PartitionMetaData](partitions.size)
-    for (i <-0 until ret.size) {
-      val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartReplicasPath(topic, partitions(i)))
-      val inSync = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartInSyncPath(topic, partitions(i)))
-      val leader = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartLeaderPath(topic, partitions(i)))
-      ret(i) = new PartitionMetaData(partitions(i),
-                                     Utils.getCSVList(replicas).toList,
-                                     Utils.getCSVList(inSync).toList,
-                                     if (leader == null) None else Some(leader)
-                                    )
+  def getTopicMetaDataFromZK(topics: Seq[String], zkClient: ZkClient): Seq[Option[TopicMetadata]] = {
+    val cachedBrokerInfo = new HashMap[Int, Broker]()
+
+    val metadataList = topics.map { topic =>
+      if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
+        val partitions = ZkUtils.getSortedPartitionIdsForTopic(zkClient, topic)
+        val partitionMetadata = new Array[PartitionMetadata](partitions.size)
+
+        for (i <-0 until partitionMetadata.size) {
+          val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partitions(i).toString))
+          val inSyncReplicas = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionInSyncPath(topic, partitions(i).toString))
+          val leader = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitions(i).toString))
+          debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
+
+          partitionMetadata(i) = new PartitionMetadata(partitions(i),
+            if (leader == null) None else Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(leader.toInt)).head),
+            getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(replicas).map(id => id.toInt)),
+            getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(inSyncReplicas).map(id => id.toInt)),
+            None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
+        }
+
+        Some(new TopicMetadata(topic, partitionMetadata))
+      } else
+        None
     }
-    Some(ret)
+
+    metadataList.toList
   }
 
+  private def getBrokerInfoFromCache(zkClient: ZkClient,
+                                     cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker],
+                                     brokerIds: Seq[Int]): Seq[Broker] = {
+    brokerIds.map { id =>
+      val optionalBrokerInfo = cachedBrokerInfo.get(id)
+      optionalBrokerInfo match {
+        case Some(brokerInfo) => brokerInfo // return broker info from the cache
+        case None => // fetch it from zookeeper
+          val brokerInfo = ZkUtils.getBrokerInfoFromIds(zkClient, List(id)).head
+          cachedBrokerInfo += (id -> brokerInfo)
+          brokerInfo
+      }
+    }
+  }
+
   private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
     val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
     (firstReplicaIndex + shift) % nBrokers
Index: core/src/main/scala/kafka/admin/ListTopicCommand.scala
===================================================================
--- core/src/main/scala/kafka/admin/ListTopicCommand.scala	(revision 1234984)
+++ core/src/main/scala/kafka/admin/ListTopicCommand.scala	(working copy)
@@ -76,13 +76,13 @@
   }
 
   def showTopic(topic: String, zkClient: ZkClient) {
-    val topicMetaData = AdminUtils.getTopicMetaDataFromZK(topic, zkClient)
+    val topicMetaData = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
     topicMetaData match {
       case None =>
         println("topic " + topic + " doesn't exist!")
       case Some(tmd) =>
         println("topic: " + topic)
-        for (part <- tmd)
+        for (part <- tmd.partitionsMetadata)
           println(part.toString)
     }
   }
Index: core/src/main/scala/kafka/admin/PartitionMetaData.scala
===================================================================
--- core/src/main/scala/kafka/admin/PartitionMetaData.scala	(revision 1234984)
+++ core/src/main/scala/kafka/admin/PartitionMetaData.scala	(working copy)
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.admin
-
-class PartitionMetaData(val partitionId: String,
-                        val replicaList: Seq[String],
-                        val inSyncList: Seq[String],
-                        val leaderId: Option[String]) {
-
-  override def toString(): String = {
-    val builder = new StringBuilder
-    builder.append("partition id: " + partitionId)
-    builder.append(" replica list: " + replicaList.mkString(","))
-    builder.append(" in-sync list: " + inSyncList.mkString(","))
-    builder.append(" leader: " + leaderId)
-    builder.toString
-  }
-
-}
\ No newline at end of file
Index: core/src/main/scala/kafka/network/SocketServerStats.scala
===================================================================
--- core/src/main/scala/kafka/network/SocketServerStats.scala	(revision 1234984)
+++ core/src/main/scala/kafka/network/SocketServerStats.scala	(working copy)
@@ -17,8 +17,6 @@
 
 package kafka.network
 
-import java.util.concurrent.atomic._
-import javax.management._
 import kafka.utils._
 import kafka.api.RequestKeys
 
Index: core/src/main/scala/kafka/network/SocketServer.scala
===================================================================
--- core/src/main/scala/kafka/network/SocketServer.scala	(revision 1234984)
+++ core/src/main/scala/kafka/network/SocketServer.scala	(working copy)
@@ -21,7 +21,6 @@
 import java.util.concurrent.atomic._
 import java.net._
 import java.io._
-import java.nio._
 import java.nio.channels._
 
 import kafka.utils._
Index: core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
===================================================================
--- core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala	(revision 1234984)
+++ core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala	(working copy)
@@ -26,7 +26,7 @@
  * 
  */
 @nonthreadsafe
-private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive {
+private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive with Logging {
   
   private val sizeBuffer: ByteBuffer = ByteBuffer.allocate(4)
   private var contentBuffer: ByteBuffer = null
@@ -81,7 +81,7 @@
     }
     catch {
       case e: OutOfMemoryError => {
-        logger.error("OOME with size " + size, e)
+        error("OOME with size " + size, e)
         throw e
       }
       case e2 =>
Index: core/src/main/scala/kafka/network/RequestChannel.scala
===================================================================
--- core/src/main/scala/kafka/network/RequestChannel.scala	(revision 1234984)
+++ core/src/main/scala/kafka/network/RequestChannel.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka.network
 
-import java.util.ArrayList
 import java.util.concurrent._
 
 object RequestChannel { 
Index: core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ConsoleConsumer.scala	(revision 1234984)
+++ core/src/main/scala/kafka/consumer/ConsoleConsumer.scala	(working copy)
@@ -21,13 +21,11 @@
 import scala.collection.JavaConversions._
 import org.I0Itec.zkclient._
 import joptsimple._
-import java.util.Arrays.asList
 import java.util.Properties
 import java.util.Random
 import java.io.PrintStream
 import kafka.message._
 import kafka.utils.{Utils, Logging}
-import kafka.utils.ZkUtils
 import kafka.utils.ZKStringSerializer
 
 /**
Index: core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
===================================================================
--- core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala	(revision 1234984)
+++ core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala	(working copy)
@@ -18,7 +18,7 @@
 package kafka.consumer
 
 import scala.collection.JavaConversions._
-import kafka.utils.{Utils, ZkUtils, ZKStringSerializer, Logging}
+import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import kafka.server.KafkaServerStartable
Index: core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala
===================================================================
--- core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala	(revision 1234984)
+++ core/src/main/scala/kafka/consumer/storage/OracleOffsetStorage.scala	(working copy)
@@ -54,8 +54,7 @@
     } finally {
       commitOrRollback(connection, success)
     }
-    if(logger.isDebugEnabled)
-      logger.debug("Updated node " + node + " for topic '" + topic + "' to " + offset)
+    debug("Updated node " + node + " for topic '" + topic + "' to " + offset)
   }
   
   def close() {
Index: core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
===================================================================
--- core/src/main/scala/kafka/tools/SimpleConsumerShell.scala	(revision 1234984)
+++ core/src/main/scala/kafka/tools/SimpleConsumerShell.scala	(working copy)
@@ -22,7 +22,6 @@
 import kafka.api.FetchRequest
 import kafka.utils._
 import kafka.consumer._
-import kafka.server._
 
 /**
  * Command line program to dump out messages to standard out using the simple consumer
Index: core/src/main/scala/kafka/tools/ReplayLogProducer.scala
===================================================================
--- core/src/main/scala/kafka/tools/ReplayLogProducer.scala	(revision 1234984)
+++ core/src/main/scala/kafka/tools/ReplayLogProducer.scala	(working copy)
@@ -17,19 +17,17 @@
 
 package kafka.tools
 
-import java.io.File
 import joptsimple.OptionParser
-import org.apache.log4j.Logger
 import java.util.concurrent.{Executors, CountDownLatch}
 import java.util.Properties
 import kafka.producer.async.DefaultEventHandler
-import kafka.serializer.{DefaultEncoder, StringEncoder}
+import kafka.serializer.DefaultEncoder
 import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer}
 import kafka.consumer._
-import kafka.utils.{ZKStringSerializer, Utils, Logging}
+import kafka.utils.{ZKStringSerializer, Logging}
 import kafka.api.OffsetRequest
 import org.I0Itec.zkclient._
-import kafka.message.{CompressionCodec, Message, MessageSet, FileMessageSet}
+import kafka.message.{CompressionCodec, Message}
 
 object ReplayLogProducer extends Logging {
 
Index: core/src/main/scala/kafka/tools/ProducerShell.scala
===================================================================
--- core/src/main/scala/kafka/tools/ProducerShell.scala	(revision 1234984)
+++ core/src/main/scala/kafka/tools/ProducerShell.scala	(working copy)
@@ -17,12 +17,9 @@
 
 package kafka.tools
 
-import java.net.URI
 import java.io._
 import joptsimple._
-import kafka.message._
 import kafka.producer._
-import java.util.Properties
 import kafka.utils.Utils
 
 /**
Index: core/src/main/scala/kafka/tools/JmxTool.scala
===================================================================
--- core/src/main/scala/kafka/tools/JmxTool.scala	(revision 1234984)
+++ core/src/main/scala/kafka/tools/JmxTool.scala	(working copy)
@@ -22,7 +22,7 @@
 import java.text.SimpleDateFormat
 import javax.management._
 import javax.management.remote._
-import joptsimple.{OptionSet, OptionParser}
+import joptsimple.OptionParser
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.math._
Index: core/src/main/scala/kafka/utils/Utils.scala
===================================================================
--- core/src/main/scala/kafka/utils/Utils.scala	(revision 1234984)
+++ core/src/main/scala/kafka/utils/Utils.scala	(working copy)
@@ -23,7 +23,6 @@
 import java.util.concurrent.atomic._
 import java.lang.management._
 import java.util.zip.CRC32
-import org.apache.log4j.Logger
 import javax.management._
 import java.util.Properties
 import scala.collection._
@@ -115,7 +114,7 @@
    * @param buffer The buffer to read from
    * @param encoding The encoding in which to read the string
    */
-  def readShortString(buffer: ByteBuffer, encoding: String): String = {
+  def readShortString(buffer: ByteBuffer, encoding: String = "UTF-8"): String = {
     val size: Int = buffer.getShort()
     if(size < 0)
       return null
@@ -130,7 +129,7 @@
    * @param string The string to write
    * @param encoding The encoding in which to write the string
    */
-  def writeShortString(buffer: ByteBuffer, string: String, encoding: String): Unit = {
+  def writeShortString(buffer: ByteBuffer, string: String, encoding: String = "UTF-8"): Unit = {
     if(string == null) {
       buffer.putShort(-1)
     } else if(string.length > Short.MaxValue) {
@@ -142,6 +141,24 @@
   }
   
   /**
+   * Return size of a size prefixed string where the size is stored as a 2 byte short
+   * @param string The string to write
+   * @param encoding The encoding in which to write the string
+   */
+  def shortStringLength(string: String, encoding: String = "UTF-8"): Int = {
+    if(string == null) {
+      2
+    } else {
+      val encodedString = string.getBytes(encoding)
+      if(encodedString.length > Short.MaxValue) {
+        throw new IllegalArgumentException("String exceeds the maximum size of " + Short.MaxValue + ".")
+      } else {
+        2 + encodedString.length
+      }
+    }
+  }
+
+  /**
    * Read a properties file from the given path
    * @param filename The path of the file to read
    */
@@ -193,7 +210,28 @@
     else
       v
   }
-  
+
+  def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = {
+    val value = buffer.getInt
+    if(value < range._1 || value > range._2)
+      throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".")
+    else value
+  }
+
+  def getShortInRange(buffer: ByteBuffer, name: String, range: (Short, Short)): Short = {
+    val value = buffer.getShort
+    if(value < range._1 || value > range._2)
+      throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".")
+    else value
+  }
+
+  def getLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = {
+    val value = buffer.getLong
+    if(value < range._1 || value > range._2)
+      throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".")
+    else value
+  }
+
   /**
    * Read a boolean value from the properties instance
    * @param props The properties to read from
@@ -618,7 +656,7 @@
   def tryCleanupZookeeper(zkUrl: String, groupId: String) {
     try {
       val dir = "/consumers/" + groupId
-      logger.info("Cleaning up temporary zookeeper data under " + dir + ".")
+      info("Cleaning up temporary zookeeper data under " + dir + ".")
       val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
       zk.deleteRecursive(dir)
       zk.close()
Index: core/src/main/scala/kafka/utils/ZkUtils.scala
===================================================================
--- core/src/main/scala/kafka/utils/ZkUtils.scala	(revision 1234984)
+++ core/src/main/scala/kafka/utils/ZkUtils.scala	(working copy)
@@ -33,34 +33,49 @@
     BrokerTopicsPath + "/" + topic
   }
 
-  def getTopicPartsPath(topic: String): String ={
+  def getTopicPartitionsPath(topic: String): String ={
     getTopicPath(topic) + "/" + "partitions"
   }
 
-  def getTopicPartPath(topic: String, partitionId: String): String ={
-    getTopicPartsPath(topic) + "/" + partitionId
+  def getTopicPartitionPath(topic: String, partitionId: String): String ={
+    getTopicPartitionsPath(topic) + "/" + partitionId
   }
 
   def getTopicVersion(zkClient: ZkClient, topic: String): String ={
     readDataMaybeNull(zkClient, getTopicPath(topic))
   }
 
-  def getTopicPartReplicasPath(topic: String, partitionId: String): String ={
-    getTopicPartPath(topic, partitionId) + "/" + "replicas"
+  def getTopicPartitionReplicasPath(topic: String, partitionId: String): String ={
+    getTopicPartitionPath(topic, partitionId) + "/" + "replicas"
   }
 
-  def getTopicPartInSyncPath(topic: String, partitionId: String): String ={
-    getTopicPartPath(topic, partitionId) + "/" + "isr"
+  def getTopicPartitionInSyncPath(topic: String, partitionId: String): String ={
+    getTopicPartitionPath(topic, partitionId) + "/" + "isr"
   }
 
-  def getTopicPartLeaderPath(topic: String, partitionId: String): String ={
-    getTopicPartPath(topic, partitionId) + "/" + "leader"
+  def getTopicPartitionLeaderPath(topic: String, partitionId: String): String ={
+    getTopicPartitionPath(topic, partitionId) + "/" + "leader"
   }
 
   def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
       ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
   }
 
+  def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) {
+    val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
+    val broker = new Broker(id, creator, host, port)
+    try {
+      createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
+    } catch {
+      case e: ZkNodeExistsException =>
+        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " +
+                                   "indicates that you either have configured a brokerid that is already in use, or " +
+                                   "else you have shutdown this broker and restarted it faster than the zookeeper " +
+                                   "timeout so it appears to be re-registering.")
+    }
+    info("Registering broker " + brokerIdPath + " succeeded with " + broker)
+  }
+
   /**
    *  make sure a persistent path exists in ZK. Create the path if not exist.
    */
@@ -283,6 +298,17 @@
     val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
     zkClient.delete(brokerPartTopicPath)
   }
+
+  /**
+   * For a given topic, this returns the sorted list of partition ids registered for this topic
+   */
+  def getSortedPartitionIdsForTopic(zkClient: ZkClient, topic: String): Seq[Int] = {
+    val topicPartitionsPath = ZkUtils.getTopicPartitionsPath(topic)
+    ZkUtils.getChildrenParentMayNotExist(zkClient, topicPartitionsPath).map(pid => pid.toInt).sortWith((s,t) => s < t)
+  }
+
+  def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] =
+    brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)) )
 }
 
 object ZKStringSerializer extends ZkSerializer {
Index: core/src/main/scala/kafka/utils/KafkaScheduler.scala
===================================================================
--- core/src/main/scala/kafka/utils/KafkaScheduler.scala	(revision 1234984)
+++ core/src/main/scala/kafka/utils/KafkaScheduler.scala	(working copy)
@@ -19,7 +19,6 @@
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import kafka.utils._
 
 /**
  * A scheduler for running jobs in the background
Index: core/src/main/scala/kafka/utils/Range.scala
===================================================================
--- core/src/main/scala/kafka/utils/Range.scala	(revision 1234984)
+++ core/src/main/scala/kafka/utils/Range.scala	(working copy)
@@ -17,9 +17,8 @@
 
 package kafka.utils
 
-import scala.math._
 
-/** 
+/**
  * A generic range value with a start and end 
  */
 trait Range {
Index: core/src/main/scala/kafka/server/KafkaZooKeeper.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(revision 1234984)
+++ core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(working copy)
@@ -18,12 +18,11 @@
 package kafka.server
 
 import kafka.utils._
-import kafka.cluster.Broker
 import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import kafka.log.LogManager
 import java.net.InetAddress
+import kafka.common.KafkaZookeeperClient
 
 /**
  * Handles the server's interaction with zookeeper. The server needs to register the following paths:
@@ -41,7 +40,7 @@
   def startup() {
     /* start client */
     info("connecting to ZK: " + config.zkConnect)
-    zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
+    zkClient = KafkaZookeeperClient.getZookeeperClient(config)
     zkClient.subscribeStateChanges(new SessionExpireListener)
   }
 
@@ -49,17 +48,7 @@
     info("Registering broker " + brokerIdPath)
     val hostName = if (config.hostName == null) InetAddress.getLocalHost.getHostAddress else config.hostName
     val creatorId = hostName + "-" + System.currentTimeMillis
-    val broker = new Broker(config.brokerId, creatorId, hostName, config.port)
-    try {
-      ZkUtils.createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
-    } catch {
-      case e: ZkNodeExistsException =>
-        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + 
-                                   "indicates that you either have configured a brokerid that is already in use, or " + 
-                                   "else you have shutdown this broker and restarted it faster than the zookeeper " + 
-                                   "timeout so it appears to be re-registering.")
-    }
-    info("Registering broker " + brokerIdPath + " succeeded with " + broker)
+    ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
   }
 
   def registerTopicInZk(topic: String) {
Index: core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1234984)
+++ core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -79,9 +79,6 @@
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
   val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
   
-  /* enable zookeeper registration in the server */
-  val enableZookeeper = Utils.getBoolean(props, "enable.zookeeper", true)
-
   /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */
   val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))
 
@@ -93,4 +90,10 @@
 
    /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */
   val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))
+
+  /* enable auto creation of topic on the server */
+  val autoCreateTopics = Utils.getBoolean(props, "auto.create.topics", true)
+
+  /* default replication factors for automatically created topics */
+  val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1)
 }
Index: core/src/main/scala/kafka/server/KafkaRequestHandler.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaRequestHandler.scala	(revision 1234984)
+++ core/src/main/scala/kafka/server/KafkaRequestHandler.scala	(working copy)
@@ -17,9 +17,6 @@
 
 package kafka.server
 
-import java.util.concurrent._
-import java.util.concurrent.atomic._
-import org.apache.log4j.Logger
 import kafka.network._
 import kafka.utils._
 
Index: core/src/main/scala/kafka/server/KafkaServer.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision 1234984)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(working copy)
@@ -17,13 +17,11 @@
 
 package kafka.server
 
-import scala.reflect.BeanProperty
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import java.io.File
-import org.apache.log4j.Logger
 import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler, Logging}
-import kafka.network.{SocketServerStats, SocketServer, RequestChannel}
+import kafka.network.{SocketServerStats, SocketServer}
 import kafka.log.LogManager
 
 /**
Index: core/src/main/scala/kafka/server/MultiMessageSetSend.scala
===================================================================
--- core/src/main/scala/kafka/server/MultiMessageSetSend.scala	(revision 1234984)
+++ core/src/main/scala/kafka/server/MultiMessageSetSend.scala	(working copy)
@@ -17,10 +17,7 @@
 
 package kafka.server
 
-import java.nio._
-import java.nio.channels._
 import kafka.network._
-import kafka.message._
 import kafka.utils._
 
 /**
Index: core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaRequestHandlers.scala	(revision 1234984)
+++ core/src/main/scala/kafka/server/KafkaRequestHandlers.scala	(working copy)
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import org.apache.log4j.Logger
-import kafka.log._
-import kafka.network._
-import kafka.message._
-import kafka.api._
-import kafka.common.ErrorMapping
-import kafka.utils.SystemTime
-import kafka.utils.Logging
-import java.io.IOException
-
-/**
- * Logic to handle the various Kafka requests
- */
-private[kafka] class KafkaRequestHandlers(val logManager: LogManager) extends Logging {
-  
-  private val requestLogger = Logger.getLogger("kafka.request.logger")
-
-  def handlerFor(requestTypeId: Short, request: Receive): Handler.Handler = {
-    requestTypeId match {
-      case RequestKeys.Produce => handleProducerRequest _
-      case RequestKeys.Fetch => handleFetchRequest _
-      case RequestKeys.MultiFetch => handleMultiFetchRequest _
-      case RequestKeys.MultiProduce => handleMultiProducerRequest _
-      case RequestKeys.Offsets => handleOffsetRequest _
-      case _ => throw new IllegalStateException("No mapping found for handler id " + requestTypeId)
-    }
-  }
-  
-  def handleProducerRequest(receive: Receive): Option[Send] = {
-    val sTime = SystemTime.milliseconds
-    val request = ProducerRequest.readFrom(receive.buffer)
-
-    if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Producer request " + request.toString)
-    handleProducerRequest(request, "ProduceRequest")
-    debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms")
-    None
-  }
-
-  def handleMultiProducerRequest(receive: Receive): Option[Send] = {
-    val request = MultiProducerRequest.readFrom(receive.buffer)
-    if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Multiproducer request " + request.toString)
-    request.produces.map(handleProducerRequest(_, "MultiProducerRequest"))
-    None
-  }
-
-  private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String) = {
-    val partition = request.getTranslatedPartition(logManager.chooseRandomPartition)
-    try {
-      logManager.getOrCreateLog(request.topic, partition).append(request.messages)
-      trace(request.messages.sizeInBytes + " bytes written to logs.")
-      request.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum)))
-    }
-    catch {
-      case e =>
-        error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e)
-        e match {
-          case _: IOException =>
-            fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
-            Runtime.getRuntime.halt(1)
-          case _ =>
-        }
-        throw e
-    }
-    None
-  }
-
-  def handleFetchRequest(request: Receive): Option[Send] = {
-    val fetchRequest = FetchRequest.readFrom(request.buffer)
-    if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Fetch request " + fetchRequest.toString)
-    Some(readMessageSet(fetchRequest))
-  }
-  
-  def handleMultiFetchRequest(request: Receive): Option[Send] = {
-    val multiFetchRequest = MultiFetchRequest.readFrom(request.buffer)
-    if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Multifetch request")
-    multiFetchRequest.fetches.foreach(req => requestLogger.trace(req.toString))
-    var responses = multiFetchRequest.fetches.map(fetch =>
-        readMessageSet(fetch)).toList
-    
-    Some(new MultiMessageSetSend(responses))
-  }
-
-  private def readMessageSet(fetchRequest: FetchRequest): MessageSetSend = {
-    var  response: MessageSetSend = null
-    try {
-      trace("Fetching log segment for topic, partition, offset, maxSize = " + fetchRequest)
-      val log = logManager.getLog(fetchRequest.topic, fetchRequest.partition)
-      if (log != null)
-        response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize))
-      else
-        response = new MessageSetSend()
-    }
-    catch {
-      case e =>
-        error("error when processing request " + fetchRequest, e)
-        response=new MessageSetSend(MessageSet.Empty, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-    }
-    response
-  }
-
-  def handleOffsetRequest(request: Receive): Option[Send] = {
-    val offsetRequest = OffsetRequest.readFrom(request.buffer)
-    if(requestLogger.isTraceEnabled)
-      requestLogger.trace("Offset request " + offsetRequest.toString)
-    val offsets = logManager.getOffsets(offsetRequest)
-    val response = new OffsetArraySend(offsets)
-    Some(response)
-  }
-}
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1234984)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -23,9 +23,11 @@
 import kafka.message._
 import kafka.api._
 import kafka.common.ErrorMapping
-import kafka.utils.SystemTime
-import kafka.utils.Logging
 import java.io.IOException
+import kafka.utils.{SystemTime, Logging}
+import collection.mutable.ListBuffer
+import kafka.admin.{CreateTopicCommand, AdminUtils}
+import java.lang.IllegalStateException
 
 /**
  * Logic to handle the various Kafka requests
@@ -42,6 +44,7 @@
         case RequestKeys.MultiFetch => handleMultiFetchRequest(receive)
         case RequestKeys.MultiProduce => handleMultiProducerRequest(receive)
         case RequestKeys.Offsets => handleOffsetRequest(receive)
+        case RequestKeys.TopicMetadata => handleTopicMetadataRequest(receive)
         case _ => throw new IllegalStateException("No mapping found for handler id " + apiId)
     }
   }
@@ -129,4 +132,38 @@
     val response = new OffsetArraySend(offsets)
     Some(response)
   }
+
+  def handleTopicMetadataRequest(request: Receive): Option[Send] = {
+    val metadataRequest = TopicMetadataRequest.readFrom(request.buffer)
+
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Topic metadata request " + metadataRequest.toString())
+
+    val topicsMetadata = new ListBuffer[TopicMetadata]()
+    val config = logManager.getServerConfig
+    val zkClient = logManager.getZookeeperClient
+    val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
+
+    metadataRequest.topics.zip(topicMetadataList).foreach { topicAndMetadata =>
+      val topic = topicAndMetadata._1
+      topicAndMetadata._2 match {
+        case Some(metadata) => topicsMetadata += metadata
+        case None =>
+          /* check if auto creation of topics is turned on */
+          if(config.autoCreateTopics) {
+            CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions,
+              config.defaultReplicationFactor)
+            info("Auto creation of topic %s with partitions %d and replication factor %d is successful!"
+              .format(topic, config.numPartitions, config.defaultReplicationFactor))
+            val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
+            newTopicMetadata match {
+              case Some(topicMetadata) => topicsMetadata += topicMetadata
+              case None =>
+                throw new IllegalStateException("Topic metadata for automatically created topic %s does not exist".format(topic))
+            }
+          }
+      }
+    }
+    Some(new TopicMetadataSend(topicsMetadata))
+  }
 }
Index: core/src/main/scala/kafka/api/MultiFetchRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/MultiFetchRequest.scala	(revision 1234984)
+++ core/src/main/scala/kafka/api/MultiFetchRequest.scala	(working copy)
@@ -19,8 +19,6 @@
 
 import java.nio._
 import kafka.network._
-import kafka.utils._
-import kafka.api._
 
 object MultiFetchRequest {
   def readFrom(buffer: ByteBuffer): MultiFetchRequest = {
Index: core/src/main/scala/kafka/api/OffsetRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/OffsetRequest.scala	(revision 1234984)
+++ core/src/main/scala/kafka/api/OffsetRequest.scala	(working copy)
@@ -62,7 +62,7 @@
                     val maxNumOffsets: Int) extends Request(RequestKeys.Offsets) {
 
   def writeTo(buffer: ByteBuffer) {
-    Utils.writeShortString(buffer, topic, "UTF-8")
+    Utils.writeShortString(buffer, topic)
     buffer.putInt(partition)
     buffer.putLong(time)
     buffer.putInt(maxNumOffsets)
Index: core/src/main/scala/kafka/api/TopicMetadata.scala
===================================================================
--- core/src/main/scala/kafka/api/TopicMetadata.scala	(revision 0)
+++ core/src/main/scala/kafka/api/TopicMetadata.scala	(revision 0)
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.api
+
+import kafka.cluster.Broker
+import java.nio.ByteBuffer
+import kafka.utils.Utils._
+import collection.mutable.ListBuffer
+
+/**
+ * topic (2 bytes + topic.length)
+ * number of partitions (4 bytes)
+ *
+ * partition id (4 bytes)
+ *
+ * does leader exist (1 byte)
+ * leader info (4 + creator.length + host.length + 4 (port) + 4 (id))
+ * number of replicas (2 bytes)
+ * replica info (4 + creator.length + host.length + 4 (port) + 4 (id))
+ * number of in sync replicas (2 bytes)
+ * replica info (4 + creator.length + host.length + 4 (port) + 4 (id))
+ *
+ * does log metadata exist (1 byte)
+ * number of log segments (4 bytes)
+ * total size of log in bytes (8 bytes)
+ *
+ * number of log segments (4 bytes)
+ * beginning offset (8 bytes)
+ * last modified timestamp (8 bytes)
+ * size of log segment (8 bytes)
+ *
+ */
+
+sealed trait LeaderRequest { def requestId: Byte }
+case object LeaderExists extends LeaderRequest { val requestId: Byte = 1 }
+case object LeaderDoesNotExist extends LeaderRequest { val requestId: Byte = 0 }
+
+sealed trait LogSegmentMetadataRequest { def requestId: Byte }
+case object LogSegmentMetadataExists extends LogSegmentMetadataRequest { val requestId: Byte = 1 }
+case object LogSegmentMetadataDoesNotExist extends LogSegmentMetadataRequest { val requestId: Byte = 0 }
+
+object TopicMetadata {
+
+  def readFrom(buffer: ByteBuffer): TopicMetadata = {
+    val topic = readShortString(buffer)
+    val numPartitions = getIntInRange(buffer, "number of partitions", (0, Int.MaxValue))
+    val partitionsMetadata = new ListBuffer[PartitionMetadata]()
+    for(i <- 0 until numPartitions)
+      partitionsMetadata += PartitionMetadata.readFrom(buffer)
+    new TopicMetadata(topic, partitionsMetadata)
+  }
+}
+
+case class TopicMetadata(topic: String, partitionsMetadata: Seq[PartitionMetadata]) {
+  def sizeInBytes: Int = {
+    var size: Int = shortStringLength(topic)
+    size += partitionsMetadata.foldLeft(4 /* number of partitions */)(_ + _.sizeInBytes)
+    debug("Size of topic metadata = " + size)
+    size
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    /* topic */
+    writeShortString(buffer, topic)
+    /* number of partitions */
+    buffer.putInt(partitionsMetadata.size)
+    partitionsMetadata.foreach(m => m.writeTo(buffer))
+  }
+}
+
+object PartitionMetadata {
+
+  def readFrom(buffer: ByteBuffer): PartitionMetadata = {
+    val partitionId = getIntInRange(buffer, "partition id", (0, Int.MaxValue)) /* partition id */
+    val doesLeaderExist = getLeaderRequest(buffer.get)
+    val leader = doesLeaderExist match {
+      case LeaderExists => /* leader exists */
+        Some(Broker.readFrom(buffer))
+      case LeaderDoesNotExist => None
+    }
+
+    /* list of all replicas */
+    val numReplicas = getShortInRange(buffer, "number of all replicas", (0, Short.MaxValue))
+    val replicas = new Array[Broker](numReplicas)
+    for(i <- 0 until numReplicas) {
+      replicas(i) = Broker.readFrom(buffer)
+    }
+
+    /* list of in-sync replicas */
+    val numIsr = getShortInRange(buffer, "number of in-sync replicas", (0, Short.MaxValue))
+    val isr = new Array[Broker](numIsr)
+    for(i <- 0 until numIsr) {
+      isr(i) = Broker.readFrom(buffer)
+    }
+
+    val doesLogMetadataExist = getLogSegmentMetadataRequest(buffer.get)
+    val logMetadata = doesLogMetadataExist match {
+      case LogSegmentMetadataExists =>
+        val numLogSegments = getIntInRange(buffer, "total number of log segments", (0, Int.MaxValue))
+        val totalDataSize = getLongInRange(buffer, "total data size", (0, Long.MaxValue))
+        val numSegmentMetadata = getIntInRange(buffer, "number of log segment metadata", (0, Int.MaxValue))
+        val segmentMetadata = numSegmentMetadata match {
+          case 0 => None
+          case _ =>
+            val metadata = new ListBuffer[LogSegmentMetadata]()
+            for(i <- 0 until numSegmentMetadata) {
+              val beginningOffset = getLongInRange(buffer, "beginning offset", (0, Long.MaxValue))
+              val lastModified = getLongInRange(buffer, "last modified time", (0, Long.MaxValue))
+              val size = getLongInRange(buffer, "size of log segment", (0, Long.MaxValue))
+              metadata += new LogSegmentMetadata(beginningOffset, lastModified, size)
+            }
+            Some(metadata)
+        }
+        Some(new LogMetadata(numLogSegments, totalDataSize, segmentMetadata))
+      case LogSegmentMetadataDoesNotExist => None
+    }
+    new PartitionMetadata(partitionId, leader, replicas, isr, logMetadata)
+  }
+
+  def getLeaderRequest(requestId: Byte): LeaderRequest = {
+    requestId match {
+      case LeaderExists.requestId => LeaderExists
+      case LeaderDoesNotExist.requestId => LeaderDoesNotExist
+      case _ => throw new IllegalArgumentException("Unknown leader request id " + requestId)
+    }
+  }
+
+  def getLogSegmentMetadataRequest(requestId: Byte): LogSegmentMetadataRequest = {
+    requestId match {
+      case LogSegmentMetadataExists.requestId => LogSegmentMetadataExists
+      case LogSegmentMetadataDoesNotExist.requestId => LogSegmentMetadataDoesNotExist
+    }
+  }
+}
+
+case class PartitionMetadata(partitionId: Int, leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker],
+                             logMetadata: Option[LogMetadata]) {
+  def sizeInBytes: Int = {
+    var size: Int = 4 /* partition id */ + 1 /* if leader exists*/
+
+    leader match {
+      case Some(l) => size += l.sizeInBytes
+      case None =>
+    }
+
+    size += 2 /* number of replicas */
+    size += replicas.foldLeft(0)(_ + _.sizeInBytes)
+    size += 2 /* number of in sync replicas */
+    size += isr.foldLeft(0)(_ + _.sizeInBytes)
+
+    size += 1 /* if log segment metadata exists */
+    logMetadata match {
+      case Some(metadata) => size += metadata.sizeInBytes
+      case None =>
+    }
+    debug("Size of partition metadata = " + size)
+    size
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(partitionId)
+
+    /* if leader exists*/
+    leader match {
+      case Some(l) =>
+        buffer.put(LeaderExists.requestId)
+        /* leader id host_name port */
+        l.writeTo(buffer)
+      case None => buffer.put(LeaderDoesNotExist.requestId)
+    }
+
+    /* number of replicas */
+    buffer.putShort(replicas.size.toShort)
+    replicas.foreach(r => r.writeTo(buffer))
+
+    /* number of in-sync replicas */
+    buffer.putShort(isr.size.toShort)
+    isr.foreach(r => r.writeTo(buffer))
+
+    /* if log segment metadata exists */
+    logMetadata match {
+      case Some(metadata) =>
+        buffer.put(LogSegmentMetadataExists.requestId)
+        metadata.writeTo(buffer)
+      case None => buffer.put(LogSegmentMetadataDoesNotExist.requestId)
+    }
+
+  }
+}
+
+case class LogMetadata(numLogSegments: Int, totalSize: Long, logSegmentMetadata: Option[Seq[LogSegmentMetadata]]) {
+  def sizeInBytes: Int = {
+    var size: Int = 4 /* num log segments */ + 8 /* total data size */ + 4 /* number of log segment metadata */
+    logSegmentMetadata match {
+      case Some(segmentMetadata) => size += segmentMetadata.foldLeft(0)(_ + _.sizeInBytes)
+      case None =>
+    }
+    debug("Size of log metadata = " + size)
+    size
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(numLogSegments)
+    buffer.putLong(totalSize)
+    /* if segment metadata exists */
+    logSegmentMetadata match {
+      case Some(segmentMetadata) =>
+        /* number of log segments */
+        buffer.putInt(segmentMetadata.size)
+        segmentMetadata.foreach(m => m.writeTo(buffer))
+      case None =>
+        buffer.putInt(0)
+    }
+  }
+}
+
+case class LogSegmentMetadata(beginningOffset: Long, lastModified: Long, size: Long) {
+  def sizeInBytes: Int = {
+    8 /* beginning offset */ + 8 /* last modified timestamp */ + 8 /* log segment size in bytes */
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putLong(beginningOffset)
+    buffer.putLong(lastModified)
+    buffer.putLong(size)
+  }
+}
+
+
Index: core/src/main/scala/kafka/api/FetchRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/FetchRequest.scala	(revision 1234984)
+++ core/src/main/scala/kafka/api/FetchRequest.scala	(working copy)
@@ -38,7 +38,7 @@
                    val maxSize: Int) extends Request(RequestKeys.Fetch) {
   
   def writeTo(buffer: ByteBuffer) {
-    Utils.writeShortString(buffer, topic, "UTF-8")
+    Utils.writeShortString(buffer, topic)
     buffer.putInt(partition)
     buffer.putLong(offset)
     buffer.putInt(maxSize)
Index: core/src/main/scala/kafka/api/RequestKeys.scala
===================================================================
--- core/src/main/scala/kafka/api/RequestKeys.scala	(revision 1234984)
+++ core/src/main/scala/kafka/api/RequestKeys.scala	(working copy)
@@ -23,4 +23,5 @@
   val MultiFetch: Short = 2
   val MultiProduce: Short = 3
   val Offsets: Short = 4
+  val TopicMetadata: Short = 5
 }
Index: core/src/main/scala/kafka/api/TopicMetadataRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/TopicMetadataRequest.scala	(revision 0)
+++ core/src/main/scala/kafka/api/TopicMetadataRequest.scala	(revision 0)
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.utils.Utils._
+import kafka.network.{Send, Request}
+import java.nio.channels.GatheringByteChannel
+import kafka.common.ErrorMapping
+import collection.mutable.ListBuffer
+
+sealed trait DetailedMetadataRequest { def requestId: Short }
+case object SegmentMetadata extends DetailedMetadataRequest { val requestId = 1.asInstanceOf[Short] }
+case object NoSegmentMetadata extends DetailedMetadataRequest { val requestId = 0.asInstanceOf[Short] }
+
+object TopicMetadataRequest {
+
+  /**
+   * TopicMetadataRequest has the following format -
+   *
+   * number of topics (4 bytes) list of topics (2 bytes + topic.length per topic) detailedMetadata (2 bytes) timestamp (8 bytes) count (4 bytes)
+   *
+   * The detailedMetadata field is a placeholder for requesting various details about partition and log metadata
+   * By default, the value for this field is 0, which means it will just return leader, replica and ISR metadata for
+   * all partitions of the list of topics mentioned in the request.
+   */
+  def getDetailedMetadataRequest(requestId: Short): DetailedMetadataRequest = {
+    requestId match {
+      case SegmentMetadata.requestId => SegmentMetadata
+      case NoSegmentMetadata.requestId => NoSegmentMetadata
+      case _ => throw new IllegalArgumentException("Unknown detailed metadata request id " + requestId)
+    }
+  }
+
+  def readFrom(buffer: ByteBuffer): TopicMetadataRequest = {
+    val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue))
+    val topics = new ListBuffer[String]()
+    for(i <- 0 until numTopics)
+      topics += readShortString(buffer, "UTF-8")
+    val topicsList = topics.toList
+    val returnDetailedMetadata = getDetailedMetadataRequest(buffer.getShort)
+    var timestamp: Option[Long] = None
+    var count: Option[Int] = None
+    returnDetailedMetadata match {
+      case NoSegmentMetadata =>
+      case SegmentMetadata =>
+        timestamp = Some(buffer.getLong)
+        count = Some(buffer.getInt)
+      case _ => throw new IllegalArgumentException("Invalid value for the detailed metadata request "
+                                                    + returnDetailedMetadata.requestId)
+    }
+    debug("topic = %s, detailed metadata request = %d"
+          .format(topicsList.head, returnDetailedMetadata.requestId))
+    new TopicMetadataRequest(topics.toList, returnDetailedMetadata, timestamp, count)
+  }
+
+  def serializeTopicMetadata(topicMetadata: Seq[TopicMetadata]): ByteBuffer = {
+    val size = topicMetadata.foldLeft(4 /* num topics */)(_ + _.sizeInBytes)
+    val buffer = ByteBuffer.allocate(size)
+    /* number of topics */
+    buffer.putInt(topicMetadata.size)
+    /* topic partition_metadata */
+    topicMetadata.foreach(m => m.writeTo(buffer))
+    buffer.rewind()
+    buffer
+  }
+
+  def deserializeTopicsMetadataResponse(buffer: ByteBuffer): Seq[TopicMetadata] = {
+    /* number of topics */
+    val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue))
+    val topicMetadata = new Array[TopicMetadata](numTopics)
+    for(i <- 0 until  numTopics)
+      topicMetadata(i) = TopicMetadata.readFrom(buffer)
+    topicMetadata
+  }
+}
+
+case class TopicMetadataRequest(val topics: Seq[String],
+                                val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata,
+                                val timestamp: Option[Long] = None, val count: Option[Int] = None)
+  extends Request(RequestKeys.TopicMetadata){
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(topics.size)
+    topics.foreach(topic => writeShortString(buffer, topic))
+    buffer.putShort(detailedMetadata.requestId)
+    detailedMetadata match {
+      case SegmentMetadata =>
+        buffer.putLong(timestamp.get)
+        buffer.putInt(count.get)
+      case NoSegmentMetadata =>
+      case _ => throw new IllegalArgumentException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
+    }
+  }
+
+  def sizeInBytes(): Int = {
+    var size: Int = 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ +
+                    2 /* detailed metadata */
+    detailedMetadata match {
+      case SegmentMetadata =>
+        size += 8 /* timestamp */ + 4 /* count */
+      case NoSegmentMetadata =>
+      case _ => throw new IllegalArgumentException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
+    }
+    size
+  }
+}
+
+class TopicMetadataSend(topicsMetadata: Seq[TopicMetadata]) extends Send {
+  private var size: Int = topicsMetadata.foldLeft(0)(_ + _.sizeInBytes)
+  private val header = ByteBuffer.allocate(6)
+  val metadata = TopicMetadataRequest.serializeTopicMetadata(topicsMetadata)
+  header.putInt(size + 2)
+  header.putShort(ErrorMapping.NoError.asInstanceOf[Short])
+  header.rewind()
+
+  var complete: Boolean = false
+
+  def writeTo(channel: GatheringByteChannel): Int = {
+    expectIncomplete()
+    var written = 0
+    if(header.hasRemaining)
+      written += channel.write(header)
+    if(!header.hasRemaining && metadata.hasRemaining)
+      written += channel.write(metadata)
+
+    if(!metadata.hasRemaining)
+      complete = true
+    written
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/api/ProducerRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/ProducerRequest.scala	(revision 1234984)
+++ core/src/main/scala/kafka/api/ProducerRequest.scala	(working copy)
@@ -41,7 +41,7 @@
                       val messages: ByteBufferMessageSet) extends Request(RequestKeys.Produce) {
 
   def writeTo(buffer: ByteBuffer) {
-    Utils.writeShortString(buffer, topic, "UTF-8")
+    Utils.writeShortString(buffer, topic)
     buffer.putInt(partition)
     buffer.putInt(messages.serialized.limit)
     buffer.put(messages.serialized)
Index: core/src/main/scala/kafka/javaapi/Implicits.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/Implicits.scala	(revision 1234984)
+++ core/src/main/scala/kafka/javaapi/Implicits.scala	(working copy)
@@ -16,10 +16,8 @@
 */
 package kafka.javaapi
 
-import java.nio.ByteBuffer
 import kafka.serializer.Encoder
-import kafka.producer.{ProducerConfig, ProducerPool}
-import kafka.producer.async.{AsyncProducerConfig, QueueItem}
+import kafka.producer.async.QueueItem
 import kafka.utils.Logging
 
 private[javaapi] object Implicits extends Logging {
Index: core/src/main/scala/kafka/javaapi/producer/Producer.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/producer/Producer.scala	(revision 1234984)
+++ core/src/main/scala/kafka/javaapi/producer/Producer.scala	(working copy)
@@ -54,7 +54,6 @@
    * partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T]
    * object in the  send API
    */
-  import kafka.javaapi.Implicits._
   def this(config: ProducerConfig,
            encoder: Encoder[V],
            eventHandler: kafka.javaapi.producer.async.EventHandler[V],
Index: core/src/main/scala/kafka/javaapi/message/MessageSet.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/message/MessageSet.scala	(revision 1234984)
+++ core/src/main/scala/kafka/javaapi/message/MessageSet.scala	(working copy)
@@ -17,8 +17,7 @@
 
 package kafka.javaapi.message
 
-import java.nio.channels._
-import kafka.message.{MessageAndOffset, InvalidMessageException, Message}
+import kafka.message.{MessageAndOffset, InvalidMessageException}
 
 /**
  * A set of messages. A message set has a fixed serialized form, though the container
Index: project/build.properties
===================================================================
--- project/build.properties	(revision 1234984)
+++ project/build.properties	(working copy)
@@ -16,7 +16,7 @@
 #Mon Feb 28 11:55:49 PST 2011
 project.name=Kafka
 sbt.version=0.7.5
-project.version=0.7.0
+project.version=0.8.0
 build.scala.versions=2.8.0
 contrib.root.dir=contrib
 lib.dir=lib
