Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(revision 1384928)
+++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(working copy)
@@ -29,7 +29,7 @@
 import kafka.utils.TestUtils._
 import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig}
 import kafka.common.ErrorMapping
-import kafka.api.{RequestKeys, TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest}
+import kafka.api.{RequestKeys, TopicMetadata, TopicMetadataResponse, TopicMetadataRequest}
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
@@ -117,7 +117,7 @@
     val metadataResponse = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
     
     // check assertions
-    val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata
+    val topicMetadata = TopicMetadataResponse.readFrom(metadataResponse).topicsMetadata
 
     topicMetadata
   }
Index: core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala	(revision 1384928)
+++ core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala	(working copy)
@@ -105,8 +105,8 @@
     new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2))
   }
 
-  def createTestTopicMetadataResponse: TopicMetaDataResponse = {
-    new TopicMetaDataResponse(1, Seq(topicmetaData1, topicmetaData2))
+  def createTestTopicMetadataResponse: TopicMetadataResponse = {
+    new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2))
   }
 }
 
@@ -199,7 +199,7 @@
     buffer = ByteBuffer.allocate(topicMetadataResponse.sizeInBytes)
     topicMetadataResponse.writeTo(buffer)
     buffer.rewind()
-    val deserializedTopicMetadataResponse = TopicMetaDataResponse.readFrom(buffer)
+    val deserializedTopicMetadataResponse = TopicMetadataResponse.readFrom(buffer)
     assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse,
                  deserializedTopicMetadataResponse)
   }
Index: core/src/test/scala/unit/kafka/javaapi/TopicMetadataResponseTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/TopicMetadataResponseTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/javaapi/TopicMetadataResponseTest.scala	(working copy)
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.api.RequestKeys
+import junit.framework.Assert._
+import org.easymock.EasyMock
+import kafka.server.{KafkaConfig, KafkaApis, ReplicaManager}
+import kafka.utils.TestUtils
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import org.junit.Test
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestUtils._
+import java.nio.ByteBuffer
+
+class TopicMetadataResponseTest extends JUnit3Suite with ZooKeeperTestHarness {
+  val props = createBrokerConfigs(1)
+  val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
+
+  @Test
+  def testTopicMetadataResponse() {
+    val response: kafka.api.TopicMetadataResponse = mockLogManagerAndTestTopic("test")
+    val responseCheck: kafka.javaapi.TopicMetadataResponse =
+      new kafka.javaapi.TopicMetadataResponse(response.versionId,
+                                              scala.collection.JavaConversions.asList(response.topicsMetadata),
+                                              response.errorCode)
+    assertEquals("Size should be equal", response.sizeInBytes, responseCheck.sizeInBytes)
+
+    val serializedMetadataResponse = ByteBuffer.allocate(response.sizeInBytes + 2)
+    val serializedMetadataResponseCheck = ByteBuffer.allocate(responseCheck.sizeInBytes + 2)
+    response.writeTo(serializedMetadataResponse)
+    responseCheck.writeTo(serializedMetadataResponseCheck)
+    TestUtils.checkEquals(serializedMetadataResponse, serializedMetadataResponseCheck)
+  }
+
+  private def mockLogManagerAndTestTopic(topic: String): kafka.api.TopicMetadataResponse = {
+    // topic metadata request only requires 1 call from the replica manager
+    val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
+    EasyMock.expect(replicaManager.config).andReturn(configs.head).times(2)
+    EasyMock.replay(replicaManager)
+
+    // create a topic metadata request
+    val topicMetadataRequest = new kafka.api.TopicMetadataRequest(List(topic))
+
+    val serializedMetadataRequest = TestUtils.createRequestByteBuffer(topicMetadataRequest)
+
+    // create the kafka request handler
+    val requestChannel = new RequestChannel(2, 5)
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1)
+
+    apis.handleTopicMetadataRequest(new RequestChannel.Request
+    (processor=0, requestKey=RequestKeys.MetadataKey, buffer=serializedMetadataRequest, startTimeNs=1))
+    val metadataResponse = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
+
+    kafka.api.TopicMetadataResponse.readFrom(metadataResponse)
+  }
+}
Index: core/src/test/scala/unit/kafka/javaapi/TopicMetadataRequestTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/TopicMetadataRequestTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/javaapi/TopicMetadataRequestTest.scala	(working copy)
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import org.junit.Test
+import junit.framework.Assert._
+import java.nio.ByteBuffer
+import kafka.utils.TestUtils
+
+class TopicMetadataRequestTest {
+
+  @Test
+  def testTopicMetadataRequest() {
+    val topicSeq = Array("Veni", "Vidi", "Dormivi")
+    val request = new TopicMetadataRequest(scala.collection.JavaConversions.asList(topicSeq))
+    val requestCheck = new kafka.api.TopicMetadataRequest(topicSeq)
+    assertEquals("Size should be equal", request.sizeInBytes, requestCheck.sizeInBytes)
+
+    val serializedMetadataRequest = ByteBuffer.allocate(request.sizeInBytes + 2)
+    val serializedMetadataRequestCheck = ByteBuffer.allocate(requestCheck.sizeInBytes + 2)
+    request.writeTo(serializedMetadataRequest)
+    requestCheck.writeTo(serializedMetadataRequestCheck)
+    TestUtils.checkEquals(serializedMetadataRequest, serializedMetadataRequestCheck)
+  }
+}
Index: core/src/main/scala/kafka/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducer.scala	(revision 1384928)
+++ core/src/main/scala/kafka/producer/SyncProducer.scala	(working copy)
@@ -111,7 +111,7 @@
 
   def send(request: TopicMetadataRequest): Seq[TopicMetadata] = {
     val response = doSend(request)
-    val topicMetaDataResponse = TopicMetaDataResponse.readFrom(response.buffer)
+    val topicMetaDataResponse = TopicMetadataResponse.readFrom(response.buffer)
     // try to throw exception based on global error codes
     ErrorMapping.maybeThrowException(topicMetaDataResponse.errorCode)
     topicMetaDataResponse.topicsMetadata
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1384928)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -434,7 +434,7 @@
       errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
     }
     topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
-    val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq, errorCode)
+    val response = new TopicMetadataResponse(metadataRequest.versionId, topicsMetadata.toSeq, errorCode)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
Index: core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala	(revision 0)
+++ core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala	(working copy)
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.javaapi
+
+import kafka.api.{RequestOrResponse, TopicMetadata}
+import kafka.common.ErrorMapping
+import java.nio.ByteBuffer
+
+
+class TopicMetadataResponse(versionId: Short,
+                            topicsMetadata: java.util.List[TopicMetadata],
+                            errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse {
+  val underlying: kafka.api.TopicMetadataResponse =
+    new kafka.api.TopicMetadataResponse(versionId, scala.collection.JavaConversions.asBuffer(topicsMetadata), errorCode)
+
+  def sizeInBytes: Int = underlying.sizeInBytes
+
+  def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
+}
Index: core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala	(revision 1384928)
+++ core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala	(working copy)
@@ -18,7 +18,7 @@
 
 import kafka.producer.SyncProducerConfig
 import kafka.javaapi.message.ByteBufferMessageSet
-import kafka.api.{ProducerResponse, PartitionData, TopicData}
+import kafka.api._
 
 class SyncProducer(syncProducer: kafka.producer.SyncProducer) {
 
@@ -37,6 +37,10 @@
     underlying.send(producerRequest)
   }
 
+  def send(request: kafka.javaapi.TopicMetadataRequest): java.util.List[TopicMetadata] = {
+    scala.collection.JavaConversions.asList(underlying.send(request.underlying))
+  }
+
   def close() {
     underlying.close
   }
Index: core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala	(revision 0)
+++ core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala	(working copy)
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.javaapi
+
+import kafka.api._
+import java.nio.ByteBuffer
+
+class TopicMetadataRequest(val versionId: Short,
+                           val clientId: String,
+                           val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) {
+  val underlying: kafka.api.TopicMetadataRequest =
+    new kafka.api.TopicMetadataRequest(versionId, clientId, scala.collection.JavaConversions.asBuffer(topics))
+
+  def this(topics: java.util.List[String]) =
+    this(kafka.api.TopicMetadataRequest.CurrentVersion, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
+
+  def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
+
+  def sizeInBytes(): Int = underlying.sizeInBytes()
+}
Index: core/src/main/scala/kafka/api/TopicMetaDataResponse.scala
===================================================================
--- core/src/main/scala/kafka/api/TopicMetaDataResponse.scala	(revision 1384928)
+++ core/src/main/scala/kafka/api/TopicMetaDataResponse.scala	(working copy)
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-import kafka.common.ErrorMapping
-
-
-object TopicMetaDataResponse {
-
-  def readFrom(buffer: ByteBuffer): TopicMetaDataResponse = {
-    val versionId = buffer.getShort
-    val errorCode = buffer.getShort
-
-    val topicCount = buffer.getInt
-    val topicsMetadata = new Array[TopicMetadata](topicCount)
-    for( i <- 0 until topicCount) {
-      topicsMetadata(i) = TopicMetadata.readFrom(buffer)
-    }
-    new TopicMetaDataResponse(versionId, topicsMetadata.toSeq, errorCode)
-  }
-}
-
-case class TopicMetaDataResponse(versionId: Short,
-                                 topicsMetadata: Seq[TopicMetadata],
-                                 errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse
-{
-  val sizeInBytes = 2 + topicsMetadata.foldLeft(4)(_ + _.sizeInBytes) + 2
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putShort(versionId)
-    /* error code */
-    buffer.putShort(errorCode)
-    /* topic metadata */
-    buffer.putInt(topicsMetadata.length)
-    topicsMetadata.foreach(_.writeTo(buffer))
-  }
-}
\ No newline at end of file
Index: core/src/main/scala/kafka/api/TopicMetadataResponse.scala
===================================================================
--- core/src/main/scala/kafka/api/TopicMetadataResponse.scala	(working copy)
+++ core/src/main/scala/kafka/api/TopicMetadataResponse.scala	(working copy)
@@ -21,9 +21,9 @@
 import kafka.common.ErrorMapping
 
 
-object TopicMetaDataResponse {
+object TopicMetadataResponse {
 
-  def readFrom(buffer: ByteBuffer): TopicMetaDataResponse = {
+  def readFrom(buffer: ByteBuffer): TopicMetadataResponse = {
     val versionId = buffer.getShort
     val errorCode = buffer.getShort
 
@@ -32,11 +32,11 @@
     for( i <- 0 until topicCount) {
       topicsMetadata(i) = TopicMetadata.readFrom(buffer)
     }
-    new TopicMetaDataResponse(versionId, topicsMetadata.toSeq, errorCode)
+    new TopicMetadataResponse(versionId, topicsMetadata.toSeq, errorCode)
   }
 }
 
-case class TopicMetaDataResponse(versionId: Short,
+case class TopicMetadataResponse(versionId: Short,
                                  topicsMetadata: Seq[TopicMetadata],
                                  errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse
 {
Index: core/src/main/scala/kafka/api/TopicMetadataRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/TopicMetadataRequest.scala	(revision 1384928)
+++ core/src/main/scala/kafka/api/TopicMetadataRequest.scala	(working copy)
@@ -33,20 +33,8 @@
 
   /**
    * TopicMetadataRequest has the following format -
-   *
    * number of topics (4 bytes) list of topics (2 bytes + topic.length per topic) detailedMetadata (2 bytes) timestamp (8 bytes) count (4 bytes)
-   *
-   * The detailedMetadata field is a placeholder for requesting various details about partition and log metadata
-   * By default, the value for this field is 0, which means it will just return leader, replica and ISR metadata for
-   * all partitions of the list of topics mentioned in the request.
    */
-  def getDetailedMetadataRequest(requestId: Short): DetailedMetadataRequest = {
-    requestId match {
-      case SegmentMetadata.requestId => SegmentMetadata
-      case NoSegmentMetadata.requestId => NoSegmentMetadata
-      case _ => throw new KafkaException("Unknown detailed metadata request id " + requestId)
-    }
-  }
 
   def readFrom(buffer: ByteBuffer): TopicMetadataRequest = {
     val versionId = buffer.getShort
@@ -56,60 +44,27 @@
     for(i <- 0 until numTopics)
       topics += readShortString(buffer, "UTF-8")
     val topicsList = topics.toList
-    val returnDetailedMetadata = getDetailedMetadataRequest(buffer.getShort)
-    var timestamp: Option[Long] = None
-    var count: Option[Int] = None
-    returnDetailedMetadata match {
-      case NoSegmentMetadata =>
-      case SegmentMetadata =>
-        timestamp = Some(buffer.getLong)
-        count = Some(buffer.getInt)
-      case _ => throw new KafkaException("Invalid value for the detailed metadata request "
-                                                    + returnDetailedMetadata.requestId)
-    }
-    debug("topic = %s, detailed metadata request = %d"
-          .format(topicsList.head, returnDetailedMetadata.requestId))
-    new TopicMetadataRequest(versionId, clientId, topics.toList, returnDetailedMetadata, timestamp, count)
+    debug("topic = %s".format(topicsList.head))
+    new TopicMetadataRequest(versionId, clientId, topics.toList)
   }
 }
 
 case class TopicMetadataRequest(val versionId: Short,
                                 val clientId: String,
-                                val topics: Seq[String],
-                                val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata,
-                                val timestamp: Option[Long] = None, val count: Option[Int] = None)
+                                val topics: Seq[String])
  extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
 
 def this(topics: Seq[String]) =
-  this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, NoSegmentMetadata, None, None)
+  this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics)
 
-
-
-
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
     buffer.putInt(topics.size)
     topics.foreach(topic => writeShortString(buffer, topic))
-    buffer.putShort(detailedMetadata.requestId)
-    detailedMetadata match {
-      case SegmentMetadata =>
-        buffer.putLong(timestamp.get)
-        buffer.putInt(count.get)
-      case NoSegmentMetadata =>
-      case _ => throw new KafkaException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
-    }
   }
 
   def sizeInBytes(): Int = {
-    var size: Int = 2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ +
-                    2 /* detailed metadata */
-    detailedMetadata match {
-      case SegmentMetadata =>
-        size += 8 /* timestamp */ + 4 /* count */
-      case NoSegmentMetadata =>
-      case _ => throw new KafkaException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
-    }
-    size
+    2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
   }
 }
