Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(revision 1387302)
+++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(working copy)
@@ -29,7 +29,7 @@
 import kafka.utils.TestUtils._
 import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig}
 import kafka.common.ErrorMapping
-import kafka.api.{RequestKeys, TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest}
+import kafka.api.{RequestKeys, TopicMetadata, TopicMetadataResponse, TopicMetadataRequest}
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
@@ -76,7 +76,6 @@
     val partitionMetadata = topicMetadata.head.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
     assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
     assertEquals(1, partitionMetadata.head.replicas.size)
   }
 
@@ -90,7 +89,6 @@
     val partitionMetadata = topicMetadata.head.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
     assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
     assertEquals(0, partitionMetadata.head.replicas.size)
     assertEquals(None, partitionMetadata.head.leader)
     assertEquals(ErrorMapping.LeaderNotAvailableCode, partitionMetadata.head.errorCode)
@@ -117,7 +115,7 @@
     val metadataResponse = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
     
     // check assertions
-    val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata
+    val topicMetadata = TopicMetadataResponse.readFrom(metadataResponse).topicsMetadata
 
     topicMetadata
   }
Index: core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala	(revision 1387302)
+++ core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala	(working copy)
@@ -121,8 +121,8 @@
     new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2))
   }
 
-  def createTestTopicMetadataResponse: TopicMetaDataResponse = {
-    new TopicMetaDataResponse(1, Seq(topicmetaData1, topicmetaData2))
+  def createTestTopicMetadataResponse: TopicMetadataResponse = {
+    new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2))
   }
 }
 
@@ -215,7 +215,7 @@
     buffer = ByteBuffer.allocate(topicMetadataResponse.sizeInBytes)
     topicMetadataResponse.writeTo(buffer)
     buffer.rewind()
-    val deserializedTopicMetadataResponse = TopicMetaDataResponse.readFrom(buffer)
+    val deserializedTopicMetadataResponse = TopicMetadataResponse.readFrom(buffer)
     assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse,
                  deserializedTopicMetadataResponse)
   }
Index: core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala	(revision 1387302)
+++ core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala	(working copy)
@@ -22,40 +22,24 @@
 import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}
 
 class ByteBufferMessageSetTest extends kafka.javaapi.message.BaseMessageSetTestCases {
+  override def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): ByteBufferMessageSet =
+    new ByteBufferMessageSet(new kafka.message.ByteBufferMessageSet(compressed, messages: _*))
 
-  override def createMessageSet(messages: Seq[Message],
-                                compressed: CompressionCodec = NoCompressionCodec): ByteBufferMessageSet =
-    new ByteBufferMessageSet(compressed, getMessageList(messages: _*))
-  
+  val msgSeq: Seq[Message] = Seq(new Message("hello".getBytes()), new Message("there".getBytes()))
+
   @Test
   def testEquals() {
-    val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                               messages = getMessageList(new Message("hello".getBytes()),
-                                                                         new Message("there".getBytes())))
-    val moreMessages = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                                messages = getMessageList(new Message("hello".getBytes()),
-                                                                          new Message("there".getBytes())))
-
+    val messageList = createMessageSet(msgSeq, NoCompressionCodec)
+    val moreMessages = createMessageSet(msgSeq, NoCompressionCodec)
     assertEquals(messageList, moreMessages)
     assertTrue(messageList.equals(moreMessages))
   }
 
   @Test
   def testEqualsWithCompression () {
-    val messageList = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
-                                            messages = getMessageList(new Message("hello".getBytes()),
-                                                                      new Message("there".getBytes())))
-    val moreMessages = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
-                                                messages = getMessageList(new Message("hello".getBytes()),
-                                                                          new Message("there".getBytes())))
-
+    val messageList = createMessageSet(msgSeq, DefaultCompressionCodec)
+    val moreMessages = createMessageSet(msgSeq, DefaultCompressionCodec)
     assertEquals(messageList, moreMessages)
     assertTrue(messageList.equals(moreMessages))
   }
-
-  private def getMessageList(messages: Message*): java.util.List[Message] = {
-    val messageList = new java.util.ArrayList[Message]()
-    messages.foreach(m => messageList.add(m))
-    messageList
-  }
 }
Index: core/src/main/scala/kafka/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducer.scala	(revision 1387302)
+++ core/src/main/scala/kafka/producer/SyncProducer.scala	(working copy)
@@ -111,7 +111,7 @@
 
   def send(request: TopicMetadataRequest): Seq[TopicMetadata] = {
     val response = doSend(request)
-    val topicMetaDataResponse = TopicMetaDataResponse.readFrom(response.buffer)
+    val topicMetaDataResponse = TopicMetadataResponse.readFrom(response.buffer)
     // try to throw exception based on global error codes
     ErrorMapping.maybeThrowException(topicMetaDataResponse.errorCode)
     topicMetaDataResponse.topicsMetadata
Index: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
===================================================================
--- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala	(revision 1387302)
+++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala	(working copy)
@@ -22,7 +22,7 @@
 import kafka.api.LeaderAndIsr
 import kafka.common.StateChangeFailedException
 import java.util.concurrent.atomic.AtomicBoolean
-import org.I0Itec.zkclient.{IZkChildListener}
+import org.I0Itec.zkclient.IZkChildListener
 
 /**
  * This class represents the state machine for replicas. It defines the states that a replica can be in, and
Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(revision 1387302)
+++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(working copy)
@@ -181,13 +181,11 @@
   override def equals(other: Any): Boolean = {
     other match {
       case that: ByteBufferMessageSet =>
-        (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+        buffer.equals(that.buffer) && initialOffset == that.initialOffset
       case _ => false
     }
   }
 
-  override def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
-
   override def hashCode: Int = {
     var hash = 17
     hash = hash * 31 + buffer.hashCode
Index: core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
===================================================================
--- core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala	(revision 1387302)
+++ core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala	(working copy)
@@ -19,7 +19,7 @@
 import kafka.utils.ZkUtils._
 import kafka.utils.Logging
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import org.I0Itec.zkclient.{IZkDataListener}
+import org.I0Itec.zkclient.IZkDataListener
 import kafka.controller.ControllerContext
 
 /**
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1387302)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -411,7 +411,7 @@
       errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
     }
     topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
-    val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq, errorCode)
+    val response = new TopicMetadataResponse(metadataRequest.versionId, topicsMetadata.toSeq, errorCode)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
Index: core/src/main/scala/kafka/javaapi/Implicits.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/Implicits.scala	(revision 1387302)
+++ core/src/main/scala/kafka/javaapi/Implicits.scala	(working copy)
@@ -19,14 +19,14 @@
 import kafka.utils.Logging
 
 private[javaapi] object Implicits extends Logging {
-  implicit def javaMessageSetToScalaMessageSet(messageSet: kafka.javaapi.message.ByteBufferMessageSet):
-     kafka.message.ByteBufferMessageSet = messageSet.underlying
-
   implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
      kafka.javaapi.message.ByteBufferMessageSet = {
-    new kafka.javaapi.message.ByteBufferMessageSet(messageSet.buffer, messageSet.initialOffset)
+    new kafka.javaapi.message.ByteBufferMessageSet(messageSet)
   }
 
   implicit def toJavaFetchResponse(response: kafka.api.FetchResponse): kafka.javaapi.FetchResponse =
-    new kafka.javaapi.FetchResponse(response.versionId, response.correlationId, response.data)
+    new kafka.javaapi.FetchResponse(response)
+
+  implicit def toJavaTopicMetadata(topicMetadata: kafka.api.TopicMetadata): kafka.javaapi.TopicMetadata =
+    new kafka.javaapi.TopicMetadata(topicMetadata)
 }
Index: core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala	(revision 1387302)
+++ core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala	(working copy)
@@ -16,21 +16,10 @@
 */
 package kafka.javaapi.message
 
-import java.nio.ByteBuffer
 import kafka.message._
 
-class ByteBufferMessageSet(private val buffer: ByteBuffer, val initialOffset: Long = 0L) extends MessageSet {
-  val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer, initialOffset)
-  def this(buffer: ByteBuffer) = this(buffer, 0L)
+class ByteBufferMessageSet(private val underlying: kafka.message.ByteBufferMessageSet) extends MessageSet {
 
-  def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
-    this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*), 0L)
-  }
-
-  def this(messages: java.util.List[Message]) {
-    this(NoCompressionCodec, messages)
-  }
-
   def validBytes: Long = underlying.validBytes
 
   override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] {
@@ -53,13 +42,11 @@
   override def equals(other: Any): Boolean = {
     other match {
       case that: ByteBufferMessageSet =>
-        (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+        underlying.equals(that.underlying)
       case _ => false
     }
   }
 
-  def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
-
   override def hashCode: Int = underlying.hashCode
 
 }
Index: core/src/main/scala/kafka/javaapi/FetchResponse.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/FetchResponse.scala	(revision 1387302)
+++ core/src/main/scala/kafka/javaapi/FetchResponse.scala	(working copy)
@@ -17,16 +17,8 @@
 
 package kafka.javaapi
 
-import kafka.api.PartitionData
-import kafka.common.TopicAndPartition
+class FetchResponse(private val underlying: kafka.api.FetchResponse) {
 
-
-class FetchResponse( val versionId: Short,
-                     val correlationId: Int,
-                     private val data: Map[TopicAndPartition, PartitionData] ) {
-
-  private val underlying = kafka.api.FetchResponse(versionId, correlationId, data)
-
   def messageSet(topic: String, partition: Int): kafka.javaapi.message.ByteBufferMessageSet = {
     import Implicits._
     underlying.messageSet(topic, partition)
Index: core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala	(revision 1387302)
+++ core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala	(working copy)
@@ -28,7 +28,7 @@
                      val port: Int,
                      val soTimeout: Int,
                      val bufferSize: Int) {
-  val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize)
+  private val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize)
 
   /**
    *  Fetch a set of messages from a topic. This version of the fetch method
Index: core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala	(revision 1387302)
+++ core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala	(working copy)
@@ -62,7 +62,7 @@
                                  val enableFetcher: Boolean) // for testing only
     extends ConsumerConnector {
 
-  val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher)
+  private val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher)
 
   def this(config: ConsumerConfig) = this(config, true)
 
Index: core/src/main/scala/kafka/javaapi/TopicMetadata.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/TopicMetadata.scala	(revision 0)
+++ core/src/main/scala/kafka/javaapi/TopicMetadata.scala	(working copy)
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.javaapi
+
+import java.nio.ByteBuffer
+import kafka.cluster.Broker
+import scala.collection.JavaConversions.asList
+
+class TopicMetadata(private val underlying: kafka.api.TopicMetadata) {
+  def topic: String = underlying.topic
+
+  def partitionsMetadata: java.util.List[PartitionMetadata] = asList(underlying.partitionsMetadata.map(new PartitionMetadata(_)))
+
+  def errorCode: Short = underlying.errorCode
+
+  def sizeInBytes: Int = underlying.sizeInBytes
+
+  def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
+}
+
+
+class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) {
+  def partitionId: Int = underlying.partitionId
+
+  def leader: Broker = {
+    underlying.leader match {
+      case Some(ldr) => ldr
+      case None => null
+    }
+  }
+
+  def replicas: java.util.List[Broker] = asList(underlying.replicas)
+
+  def isr: java.util.List[Broker] = asList(underlying.isr)
+
+  def errorCode: Short = underlying.errorCode
+
+  def sizeInBytes: Int = underlying.sizeInBytes
+
+  def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala	(revision 0)
+++ core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala	(working copy)
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.javaapi
+
+import kafka.api._
+import java.nio.ByteBuffer
+
+class TopicMetadataRequest(val versionId: Short,
+                           val clientId: String,
+                           val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) {
+  private val underlying: kafka.api.TopicMetadataRequest =
+    new kafka.api.TopicMetadataRequest(versionId, clientId, scala.collection.JavaConversions.asBuffer(topics))
+
+  def this(topics: java.util.List[String]) =
+    this(kafka.api.TopicMetadataRequest.CurrentVersion, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
+
+  def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
+
+  def sizeInBytes: Int = underlying.sizeInBytes()
+}
Index: core/src/main/scala/kafka/admin/AdminUtils.scala
===================================================================
--- core/src/main/scala/kafka/admin/AdminUtils.scala	(revision 1387302)
+++ core/src/main/scala/kafka/admin/AdminUtils.scala	(working copy)
@@ -121,15 +121,14 @@
               case e => throw new ReplicaNotAvailableException(e)
             }
 
-            new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError,
-              None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
+            new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
           }catch {
-            case e: ReplicaNotAvailableException => new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
-              ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-              None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
-            case le: LeaderNotAvailableException => new PartitionMetadata(partition, None, replicaInfo, isrInfo,
-              ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]]),
-              None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
+            case e: ReplicaNotAvailableException =>
+              new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
+                                    ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+            case le: LeaderNotAvailableException =>
+              new PartitionMetadata(partition, None, replicaInfo, isrInfo,
+                                    ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]]))
           }
         }
         new TopicMetadata(topic, partitionMetadata)
Index: core/src/main/scala/kafka/api/TopicMetaDataResponse.scala
===================================================================
--- core/src/main/scala/kafka/api/TopicMetaDataResponse.scala	(revision 1387302)
+++ core/src/main/scala/kafka/api/TopicMetaDataResponse.scala	(working copy)
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import kafka.common.ErrorMapping
-
-
-object TopicMetaDataResponse {
-
-  def readFrom(buffer: ByteBuffer): TopicMetaDataResponse = {
-    val versionId = buffer.getShort
-    val errorCode = buffer.getShort
-
-    val topicCount = buffer.getInt
-    val topicsMetadata = new Array[TopicMetadata](topicCount)
-    for( i <- 0 until topicCount) {
-      topicsMetadata(i) = TopicMetadata.readFrom(buffer)
-    }
-    new TopicMetaDataResponse(versionId, topicsMetadata.toSeq, errorCode)
-  }
-}
-
-case class TopicMetaDataResponse(versionId: Short,
-                                 topicsMetadata: Seq[TopicMetadata],
-                                 errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse
-{
-  val sizeInBytes = 2 + topicsMetadata.foldLeft(4)(_ + _.sizeInBytes) + 2
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putShort(versionId)
-    /* error code */
-    buffer.putShort(errorCode)
-    /* topic metadata */
-    buffer.putInt(topicsMetadata.length)
-    topicsMetadata.foreach(_.writeTo(buffer))
-  }
-}
\ No newline at end of file
Index: core/src/main/scala/kafka/api/TopicMetadataResponse.scala
===================================================================
--- core/src/main/scala/kafka/api/TopicMetadataResponse.scala	(revision 0)
+++ core/src/main/scala/kafka/api/TopicMetadataResponse.scala	(working copy)
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.common.ErrorMapping
+
+
+object TopicMetadataResponse {
+
+  def readFrom(buffer: ByteBuffer): TopicMetadataResponse = {
+    val versionId = buffer.getShort
+    val errorCode = buffer.getShort
+
+    val topicCount = buffer.getInt
+    val topicsMetadata = new Array[TopicMetadata](topicCount)
+    for( i <- 0 until topicCount) {
+      topicsMetadata(i) = TopicMetadata.readFrom(buffer)
+    }
+    new TopicMetadataResponse(versionId, topicsMetadata.toSeq, errorCode)
+  }
+}
+
+case class TopicMetadataResponse(versionId: Short,
+                                 topicsMetadata: Seq[TopicMetadata],
+                                 errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse
+{
+  val sizeInBytes = 2 + topicsMetadata.foldLeft(4)(_ + _.sizeInBytes) + 2
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    /* error code */
+    buffer.putShort(errorCode)
+    /* topic metadata */
+    buffer.putInt(topicsMetadata.length)
+    topicsMetadata.foreach(_.writeTo(buffer))
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/api/TopicMetadata.scala
===================================================================
--- core/src/main/scala/kafka/api/TopicMetadata.scala	(revision 1387302)
+++ core/src/main/scala/kafka/api/TopicMetadata.scala	(working copy)
@@ -51,10 +51,6 @@
 case object LeaderExists extends LeaderRequest { val requestId: Byte = 1 }
 case object LeaderDoesNotExist extends LeaderRequest { val requestId: Byte = 0 }
 
-sealed trait LogSegmentMetadataRequest { def requestId: Byte }
-case object LogSegmentMetadataExists extends LogSegmentMetadataRequest { val requestId: Byte = 1 }
-case object LogSegmentMetadataDoesNotExist extends LogSegmentMetadataRequest { val requestId: Byte = 0 }
-
 object TopicMetadata {
 
   def readFrom(buffer: ByteBuffer): TopicMetadata = {
@@ -114,28 +110,7 @@
       isr(i) = Broker.readFrom(buffer)
     }
 
-    val doesLogMetadataExist = getLogSegmentMetadataRequest(buffer.get)
-    val logMetadata = doesLogMetadataExist match {
-      case LogSegmentMetadataExists =>
-        val numLogSegments = getIntInRange(buffer, "total number of log segments", (0, Int.MaxValue))
-        val totalDataSize = getLongInRange(buffer, "total data size", (0, Long.MaxValue))
-        val numSegmentMetadata = getIntInRange(buffer, "number of log segment metadata", (0, Int.MaxValue))
-        val segmentMetadata = numSegmentMetadata match {
-          case 0 => None
-          case _ =>
-            val metadata = new ListBuffer[LogSegmentMetadata]()
-            for(i <- 0 until numSegmentMetadata) {
-              val beginningOffset = getLongInRange(buffer, "beginning offset", (0, Long.MaxValue))
-              val lastModified = getLongInRange(buffer, "last modified time", (0, Long.MaxValue))
-              val size = getLongInRange(buffer, "size of log segment", (0, Long.MaxValue))
-              metadata += new LogSegmentMetadata(beginningOffset, lastModified, size)
-            }
-            Some(metadata)
-        }
-        Some(new LogMetadata(numLogSegments, totalDataSize, segmentMetadata))
-      case LogSegmentMetadataDoesNotExist => None
-    }
-    new PartitionMetadata(partitionId, leader, replicas, isr, errorCode, logMetadata)
+    new PartitionMetadata(partitionId, leader, replicas, isr, errorCode)
   }
 
   private def getLeaderRequest(requestId: Byte): LeaderRequest = {
@@ -145,17 +120,10 @@
       case _ => throw new KafkaException("Unknown leader request id " + requestId)
     }
   }
-
-  private def getLogSegmentMetadataRequest(requestId: Byte): LogSegmentMetadataRequest = {
-    requestId match {
-      case LogSegmentMetadataExists.requestId => LogSegmentMetadataExists
-      case LogSegmentMetadataDoesNotExist.requestId => LogSegmentMetadataDoesNotExist
-    }
-  }
 }
 
 case class PartitionMetadata(partitionId: Int, val leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty,
-                             errorCode: Short = ErrorMapping.NoError, logMetadata: Option[LogMetadata] = None) {
+                             errorCode: Short = ErrorMapping.NoError) {
   def sizeInBytes: Int = {
     var size: Int = 2 /* error code */ + 4 /* partition id */ + 1 /* if leader exists*/
 
@@ -169,11 +137,6 @@
     size += 2 /* number of in sync replicas */
     size += isr.foldLeft(0)(_ + _.sizeInBytes)
 
-    size += 1 /* if log segment metadata exists */
-    logMetadata match {
-      case Some(metadata) => size += metadata.sizeInBytes
-      case None =>
-    }
     debug("Size of partition metadata = " + size)
     size
   }
@@ -198,54 +161,7 @@
     /* number of in-sync replicas */
     buffer.putShort(isr.size.toShort)
     isr.foreach(r => r.writeTo(buffer))
-
-    /* if log segment metadata exists */
-    logMetadata match {
-      case Some(metadata) =>
-        buffer.put(LogSegmentMetadataExists.requestId)
-        metadata.writeTo(buffer)
-      case None => buffer.put(LogSegmentMetadataDoesNotExist.requestId)
-    }
-
   }
 }
 
-case class LogMetadata(numLogSegments: Int, totalSize: Long, logSegmentMetadata: Option[Seq[LogSegmentMetadata]]) {
-  def sizeInBytes: Int = {
-    var size: Int = 4 /* num log segments */ + 8 /* total data size */ + 4 /* number of log segment metadata */
-    logSegmentMetadata match {
-      case Some(segmentMetadata) => size += segmentMetadata.foldLeft(0)(_ + _.sizeInBytes)
-      case None =>
-    }
-    debug("Size of log metadata = " + size)
-    size
-  }
 
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putInt(numLogSegments)
-    buffer.putLong(totalSize)
-    /* if segment metadata exists */
-    logSegmentMetadata match {
-      case Some(segmentMetadata) =>
-        /* number of log segments */
-        buffer.putInt(segmentMetadata.size)
-        segmentMetadata.foreach(m => m.writeTo(buffer))
-      case None =>
-        buffer.putInt(0)
-    }
-  }
-}
-
-case class LogSegmentMetadata(beginningOffset: Long, lastModified: Long, size: Long) {
-  def sizeInBytes: Int = {
-    8 /* beginning offset */ + 8 /* last modified timestamp */ + 8 /* log segment size in bytes */
-  }
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putLong(beginningOffset)
-    buffer.putLong(lastModified)
-    buffer.putLong(size)
-  }
-}
-
-
Index: core/src/main/scala/kafka/api/TopicMetadataRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/TopicMetadataRequest.scala	(revision 1387302)
+++ core/src/main/scala/kafka/api/TopicMetadataRequest.scala	(working copy)
@@ -21,32 +21,15 @@
 import kafka.utils.Utils._
 import collection.mutable.ListBuffer
 import kafka.utils._
-import kafka.common.KafkaException
 
-sealed trait DetailedMetadataRequest { def requestId: Short }
-case object SegmentMetadata extends DetailedMetadataRequest { val requestId = 1.asInstanceOf[Short] }
-case object NoSegmentMetadata extends DetailedMetadataRequest { val requestId = 0.asInstanceOf[Short] }
-
 object TopicMetadataRequest {
   val CurrentVersion = 1.shortValue()
   val DefaultClientId = ""
 
   /**
    * TopicMetadataRequest has the following format -
-   *
    * number of topics (4 bytes) list of topics (2 bytes + topic.length per topic) detailedMetadata (2 bytes) timestamp (8 bytes) count (4 bytes)
-   *
-   * The detailedMetadata field is a placeholder for requesting various details about partition and log metadata
-   * By default, the value for this field is 0, which means it will just return leader, replica and ISR metadata for
-   * all partitions of the list of topics mentioned in the request.
    */
-  def getDetailedMetadataRequest(requestId: Short): DetailedMetadataRequest = {
-    requestId match {
-      case SegmentMetadata.requestId => SegmentMetadata
-      case NoSegmentMetadata.requestId => NoSegmentMetadata
-      case _ => throw new KafkaException("Unknown detailed metadata request id " + requestId)
-    }
-  }
 
   def readFrom(buffer: ByteBuffer): TopicMetadataRequest = {
     val versionId = buffer.getShort
@@ -56,60 +39,27 @@
     for(i <- 0 until numTopics)
       topics += readShortString(buffer, "UTF-8")
     val topicsList = topics.toList
-    val returnDetailedMetadata = getDetailedMetadataRequest(buffer.getShort)
-    var timestamp: Option[Long] = None
-    var count: Option[Int] = None
-    returnDetailedMetadata match {
-      case NoSegmentMetadata =>
-      case SegmentMetadata =>
-        timestamp = Some(buffer.getLong)
-        count = Some(buffer.getInt)
-      case _ => throw new KafkaException("Invalid value for the detailed metadata request "
-                                                    + returnDetailedMetadata.requestId)
-    }
-    debug("topic = %s, detailed metadata request = %d"
-          .format(topicsList.head, returnDetailedMetadata.requestId))
-    new TopicMetadataRequest(versionId, clientId, topics.toList, returnDetailedMetadata, timestamp, count)
+    debug("topic = %s".format(topicsList.head))
+    new TopicMetadataRequest(versionId, clientId, topics.toList)
   }
 }
 
 case class TopicMetadataRequest(val versionId: Short,
                                 val clientId: String,
-                                val topics: Seq[String],
-                                val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata,
-                                val timestamp: Option[Long] = None, val count: Option[Int] = None)
+                                val topics: Seq[String])
  extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
 
 def this(topics: Seq[String]) =
-  this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, NoSegmentMetadata, None, None)
+  this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics)
 
-
-
-
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
     buffer.putInt(topics.size)
     topics.foreach(topic => writeShortString(buffer, topic))
-    buffer.putShort(detailedMetadata.requestId)
-    detailedMetadata match {
-      case SegmentMetadata =>
-        buffer.putLong(timestamp.get)
-        buffer.putInt(count.get)
-      case NoSegmentMetadata =>
-      case _ => throw new KafkaException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
-    }
   }
 
   def sizeInBytes(): Int = {
-    var size: Int = 2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ +
-                    2 /* detailed metadata */
-    detailedMetadata match {
-      case SegmentMetadata =>
-        size += 8 /* timestamp */ + 4 /* count */
-      case NoSegmentMetadata =>
-      case _ => throw new KafkaException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
-    }
-    size
+    2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
   }
 }
