Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala =================================================================== --- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (revision 1387487) +++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (working copy) @@ -29,7 +29,7 @@ import kafka.utils.TestUtils._ import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig} import kafka.common.ErrorMapping -import kafka.api.{RequestKeys, TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest} +import kafka.api.{RequestKeys, TopicMetadata, TopicMetadataResponse, TopicMetadataRequest} class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(1) @@ -76,7 +76,6 @@ val partitionMetadata = topicMetadata.head.partitionsMetadata assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size) assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId) - assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null)) assertEquals(1, partitionMetadata.head.replicas.size) } @@ -90,7 +89,6 @@ val partitionMetadata = topicMetadata.head.partitionsMetadata assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size) assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId) - assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null)) assertEquals(0, partitionMetadata.head.replicas.size) assertEquals(None, partitionMetadata.head.leader) assertEquals(ErrorMapping.LeaderNotAvailableCode, partitionMetadata.head.errorCode) @@ -117,7 +115,7 @@ val metadataResponse = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer // check assertions - val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata + val topicMetadata = TopicMetadataResponse.readFrom(metadataResponse).topicsMetadata topicMetadata } Index: core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala =================================================================== --- core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala (revision 1387487) +++ core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala (working copy) @@ -121,8 +121,8 @@ new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2)) } - def createTestTopicMetadataResponse: TopicMetaDataResponse = { - new TopicMetaDataResponse(1, Seq(topicmetaData1, topicmetaData2)) + def createTestTopicMetadataResponse: TopicMetadataResponse = { + new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2)) } } @@ -215,7 +215,7 @@ buffer = ByteBuffer.allocate(topicMetadataResponse.sizeInBytes) topicMetadataResponse.writeTo(buffer) buffer.rewind() - val deserializedTopicMetadataResponse = TopicMetaDataResponse.readFrom(buffer) + val deserializedTopicMetadataResponse = TopicMetadataResponse.readFrom(buffer) assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse, deserializedTopicMetadataResponse) } Index: core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala =================================================================== --- core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala (revision 1387487) +++ core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala (working copy) @@ -22,40 +22,24 @@ import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message} class ByteBufferMessageSetTest extends kafka.javaapi.message.BaseMessageSetTestCases { + override def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): ByteBufferMessageSet = + new ByteBufferMessageSet(new kafka.message.ByteBufferMessageSet(compressed, messages: _*)) - override def createMessageSet(messages: Seq[Message], - compressed: CompressionCodec = NoCompressionCodec): ByteBufferMessageSet = - new ByteBufferMessageSet(compressed, getMessageList(messages: _*)) - + val msgSeq: Seq[Message] = Seq(new Message("hello".getBytes()), new Message("there".getBytes())) + @Test def testEquals() { - val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = getMessageList(new Message("hello".getBytes()), - new Message("there".getBytes()))) - val moreMessages = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, - messages = getMessageList(new Message("hello".getBytes()), - new Message("there".getBytes()))) - + val messageList = createMessageSet(msgSeq, NoCompressionCodec) + val moreMessages = createMessageSet(msgSeq, NoCompressionCodec) assertEquals(messageList, moreMessages) assertTrue(messageList.equals(moreMessages)) } @Test def testEqualsWithCompression () { - val messageList = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, - messages = getMessageList(new Message("hello".getBytes()), - new Message("there".getBytes()))) - val moreMessages = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, - messages = getMessageList(new Message("hello".getBytes()), - new Message("there".getBytes()))) - + val messageList = createMessageSet(msgSeq, DefaultCompressionCodec) + val moreMessages = createMessageSet(msgSeq, DefaultCompressionCodec) assertEquals(messageList, moreMessages) assertTrue(messageList.equals(moreMessages)) } - - private def getMessageList(messages: Message*): java.util.List[Message] = { - val messageList = new java.util.ArrayList[Message]() - messages.foreach(m => messageList.add(m)) - messageList - } } Index: core/src/main/scala/kafka/producer/SyncProducer.scala =================================================================== --- core/src/main/scala/kafka/producer/SyncProducer.scala (revision 1387487) +++ core/src/main/scala/kafka/producer/SyncProducer.scala (working copy) @@ -21,7 +21,6 @@ import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive} import kafka.utils._ import java.util.Random -import kafka.common.ErrorMapping import java.util.concurrent.TimeUnit import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} @@ -111,9 +110,7 @@ def send(request: TopicMetadataRequest): Seq[TopicMetadata] = { val response = doSend(request) - val topicMetaDataResponse = TopicMetaDataResponse.readFrom(response.buffer) - // try to throw exception based on global error codes - ErrorMapping.maybeThrowException(topicMetaDataResponse.errorCode) + val topicMetaDataResponse = TopicMetadataResponse.readFrom(response.buffer) topicMetaDataResponse.topicsMetadata } Index: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala =================================================================== --- core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (revision 1387487) +++ core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (working copy) @@ -22,7 +22,7 @@ import kafka.api.LeaderAndIsr import kafka.common.StateChangeFailedException import java.util.concurrent.atomic.AtomicBoolean -import org.I0Itec.zkclient.{IZkChildListener} +import org.I0Itec.zkclient.IZkChildListener /** * This class represents the state machine for replicas. It defines the states that a replica can be in, and Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (revision 1387487) +++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (working copy) @@ -181,13 +181,11 @@ override def equals(other: Any): Boolean = { other match { case that: ByteBufferMessageSet => - (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset + buffer.equals(that.buffer) && initialOffset == that.initialOffset case _ => false } } - override def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet] - override def hashCode: Int = { var hash = 17 hash = hash * 31 + buffer.hashCode Index: core/src/main/scala/kafka/consumer/SimpleConsumer.scala =================================================================== --- core/src/main/scala/kafka/consumer/SimpleConsumer.scala (revision 1387487) +++ core/src/main/scala/kafka/consumer/SimpleConsumer.scala (working copy) @@ -86,6 +86,12 @@ } } + def send(request: TopicMetadataRequest): Seq[TopicMetadata] = { + val response = sendRequest(request) + val topicMetaDataResponse = TopicMetadataResponse.readFrom(response.buffer) + topicMetaDataResponse.topicsMetadata + } + /** * Fetch a set of messages from a topic. * @@ -128,4 +134,4 @@ object FetchRequestAndResponseStat extends KafkaMetricsGroup { val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) val respondSizeHist = newHistogram("FetchResponseSize") -} \ No newline at end of file +} Index: core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala =================================================================== --- core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala (revision 1387487) +++ core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala (working copy) @@ -19,7 +19,7 @@ import kafka.utils.ZkUtils._ import kafka.utils.Logging import org.I0Itec.zkclient.exception.ZkNodeExistsException -import org.I0Itec.zkclient.{IZkDataListener} +import org.I0Itec.zkclient.IZkDataListener import kafka.controller.ControllerContext /** Index: core/src/main/scala/kafka/server/KafkaApis.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaApis.scala (revision 1387487) +++ core/src/main/scala/kafka/server/KafkaApis.scala (working copy) @@ -380,38 +380,38 @@ val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]() var errorCode = ErrorMapping.NoError val config = replicaManager.config - try { - val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient) - metadataRequest.topics.zip(topicMetadataList).foreach( - topicAndMetadata =>{ - val topic = topicAndMetadata._1 - topicAndMetadata._2.errorCode match { - case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2 - case ErrorMapping.UnknownTopicOrPartitionCode => + val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient) + metadataRequest.topics.zip(topicMetadataList).foreach( + topicAndMetadata => { + val topic = topicAndMetadata._1 + topicAndMetadata._2.errorCode match { + case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2 + case ErrorMapping.UnknownTopicOrPartitionCode => + try { /* check if auto creation of topics is turned on */ - if(config.autoCreateTopics) { + if (config.autoCreateTopics) { CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor, topicNameValidator = topicNameValidator) info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" .format(topic, config.numPartitions, config.defaultReplicationFactor)) val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head + topicsMetadata += newTopicMetadata newTopicMetadata.errorCode match { - case ErrorMapping.NoError => topicsMetadata += newTopicMetadata - case _ => - throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topic)) + case ErrorMapping.NoError => + case _ => throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topic)) } } - case _ => error("Error while fetching topic metadata for topic " + topic, - ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause) - } - }) - } catch { - case e => error("Error while retrieving topic metadata", e) - // convert exception type to error code - errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]) - } + } catch { + case e => error("Error while retrieving topic metadata", e) + } + case _ => + error("Error while fetching topic metadata for topic " + topic, + ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause) + topicsMetadata += topicAndMetadata._2 + } + }) topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString)) - val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq, errorCode) + val response = new TopicMetadataResponse(metadataRequest.versionId, topicsMetadata.toSeq) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } Index: core/src/main/scala/kafka/javaapi/Implicits.scala =================================================================== --- core/src/main/scala/kafka/javaapi/Implicits.scala (revision 1387487) +++ core/src/main/scala/kafka/javaapi/Implicits.scala (working copy) @@ -17,16 +17,29 @@ package kafka.javaapi import kafka.utils.Logging +import scala.collection.JavaConversions.asList private[javaapi] object Implicits extends Logging { - implicit def javaMessageSetToScalaMessageSet(messageSet: kafka.javaapi.message.ByteBufferMessageSet): - kafka.message.ByteBufferMessageSet = messageSet.underlying implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet): - kafka.javaapi.message.ByteBufferMessageSet = { - new kafka.javaapi.message.ByteBufferMessageSet(messageSet.buffer, messageSet.initialOffset) - } + kafka.javaapi.message.ByteBufferMessageSet = + new kafka.javaapi.message.ByteBufferMessageSet(messageSet) implicit def toJavaFetchResponse(response: kafka.api.FetchResponse): kafka.javaapi.FetchResponse = - new kafka.javaapi.FetchResponse(response.versionId, response.correlationId, response.data) + new kafka.javaapi.FetchResponse(response) + + implicit def toJavaTopicMetadataList(topicMetadataSeq: Seq[kafka.api.TopicMetadata]): + java.util.List[kafka.javaapi.TopicMetadata] = + topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_)) + + implicit def toPartitionMetadataList(partitionMetadataSeq: Seq[kafka.api.PartitionMetadata]): + java.util.List[kafka.javaapi.PartitionMetadata] = + partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_)) + + implicit def optionToJavaRef[T](opt: Option[T]): T = { + opt match { + case Some(obj) => obj + case None => null + } + } } Index: core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (revision 1387487) +++ core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (working copy) @@ -16,21 +16,10 @@ */ package kafka.javaapi.message -import java.nio.ByteBuffer import kafka.message._ -class ByteBufferMessageSet(private val buffer: ByteBuffer, val initialOffset: Long = 0L) extends MessageSet { - val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer, initialOffset) - def this(buffer: ByteBuffer) = this(buffer, 0L) +class ByteBufferMessageSet(private val underlying: kafka.message.ByteBufferMessageSet) extends MessageSet { - def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) { - this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*), 0L) - } - - def this(messages: java.util.List[Message]) { - this(NoCompressionCodec, messages) - } - def validBytes: Long = underlying.validBytes override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] { @@ -53,13 +42,11 @@ override def equals(other: Any): Boolean = { other match { case that: ByteBufferMessageSet => - (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset + underlying.equals(that.underlying) case _ => false } } - def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet] - override def hashCode: Int = underlying.hashCode } Index: core/src/main/scala/kafka/javaapi/FetchResponse.scala =================================================================== --- core/src/main/scala/kafka/javaapi/FetchResponse.scala (revision 1387487) +++ core/src/main/scala/kafka/javaapi/FetchResponse.scala (working copy) @@ -17,16 +17,8 @@ package kafka.javaapi -import kafka.api.PartitionData -import kafka.common.TopicAndPartition +class FetchResponse(private val underlying: kafka.api.FetchResponse) { - -class FetchResponse( val versionId: Short, - val correlationId: Int, - private val data: Map[TopicAndPartition, PartitionData] ) { - - private val underlying = kafka.api.FetchResponse(versionId, correlationId, data) - def messageSet(topic: String, partition: Int): kafka.javaapi.message.ByteBufferMessageSet = { import Implicits._ underlying.messageSet(topic, partition) Index: core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala =================================================================== --- core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala (revision 1387487) +++ core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala (working copy) @@ -28,7 +28,7 @@ val port: Int, val soTimeout: Int, val bufferSize: Int) { - val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize) + private val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize) /** * Fetch a set of messages from a topic. This version of the fetch method @@ -55,6 +55,17 @@ } /** + * Fetch metadata for a sequence of topics. + * + * @param request specifies the versionId, clientId, sequence of topics. + * @return metadata for each topic in the request. + */ + def send(request: kafka.javaapi.TopicMetadataRequest): java.util.List[kafka.javaapi.TopicMetadata] = { + import kafka.javaapi.Implicits._ + underlying.send(request.underlying) + } + + /** * Get a list of valid offsets (up to maxSize) before the given time. * The result is a list of offsets, in descending order. * Index: core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala =================================================================== --- core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala (revision 1387487) +++ core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala (working copy) @@ -62,7 +62,7 @@ val enableFetcher: Boolean) // for testing only extends ConsumerConnector { - val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher) + private val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher) def this(config: ConsumerConfig) = this(config, true) Index: core/src/main/scala/kafka/javaapi/TopicMetadata.scala =================================================================== --- core/src/main/scala/kafka/javaapi/TopicMetadata.scala (revision 0) +++ core/src/main/scala/kafka/javaapi/TopicMetadata.scala (working copy) @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.javaapi + +import kafka.cluster.Broker +import scala.collection.JavaConversions.asList + +class TopicMetadata(private val underlying: kafka.api.TopicMetadata) { + def topic: String = underlying.topic + + def partitionsMetadata: java.util.List[PartitionMetadata] = { + import kafka.javaapi.Implicits._ + underlying.partitionsMetadata + } + + def errorCode: Short = underlying.errorCode + + def sizeInBytes: Int = underlying.sizeInBytes +} + + +class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) { + def partitionId: Int = underlying.partitionId + + def leader: Broker = { + import kafka.javaapi.Implicits._ + underlying.leader + } + + def replicas: java.util.List[Broker] = underlying.replicas + + def isr: java.util.List[Broker] = underlying.isr + + def errorCode: Short = underlying.errorCode + + def sizeInBytes: Int = underlying.sizeInBytes +} Index: core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala =================================================================== --- core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala (revision 0) +++ core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala (working copy) @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.javaapi + +import kafka.api._ +import java.nio.ByteBuffer +import scala.collection.JavaConversions.asBuffer + +class TopicMetadataRequest(val versionId: Short, + val clientId: String, + val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) { + val underlying: kafka.api.TopicMetadataRequest = + new kafka.api.TopicMetadataRequest(versionId, clientId, topics) + + def this(topics: java.util.List[String]) = + this(kafka.api.TopicMetadataRequest.CurrentVersion, kafka.api.TopicMetadataRequest.DefaultClientId, topics) + + def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer) + + def sizeInBytes: Int = underlying.sizeInBytes() +} Index: core/src/main/scala/kafka/admin/AdminUtils.scala =================================================================== --- core/src/main/scala/kafka/admin/AdminUtils.scala (revision 1387487) +++ core/src/main/scala/kafka/admin/AdminUtils.scala (working copy) @@ -121,15 +121,14 @@ case e => throw new ReplicaNotAvailableException(e) } - new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError, - None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */) + new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError) }catch { - case e: ReplicaNotAvailableException => new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, - ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), - None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */) - case le: LeaderNotAvailableException => new PartitionMetadata(partition, None, replicaInfo, isrInfo, - ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]]), - None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */) + case e: ReplicaNotAvailableException => + new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + case le: LeaderNotAvailableException => + new PartitionMetadata(partition, None, replicaInfo, isrInfo, + ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]])) } } new TopicMetadata(topic, partitionMetadata) Index: core/src/main/scala/kafka/api/TopicMetaDataResponse.scala =================================================================== --- core/src/main/scala/kafka/api/TopicMetaDataResponse.scala (revision 1387487) +++ core/src/main/scala/kafka/api/TopicMetaDataResponse.scala (working copy) @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -import java.nio.ByteBuffer -import kafka.common.ErrorMapping - - -object TopicMetaDataResponse { - - def readFrom(buffer: ByteBuffer): TopicMetaDataResponse = { - val versionId = buffer.getShort - val errorCode = buffer.getShort - - val topicCount = buffer.getInt - val topicsMetadata = new Array[TopicMetadata](topicCount) - for( i <- 0 until topicCount) { - topicsMetadata(i) = TopicMetadata.readFrom(buffer) - } - new TopicMetaDataResponse(versionId, topicsMetadata.toSeq, errorCode) - } -} - -case class TopicMetaDataResponse(versionId: Short, - topicsMetadata: Seq[TopicMetadata], - errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse -{ - val sizeInBytes = 2 + topicsMetadata.foldLeft(4)(_ + _.sizeInBytes) + 2 - - def writeTo(buffer: ByteBuffer) { - buffer.putShort(versionId) - /* error code */ - buffer.putShort(errorCode) - /* topic metadata */ - buffer.putInt(topicsMetadata.length) - topicsMetadata.foreach(_.writeTo(buffer)) - } -} \ No newline at end of file Index: core/src/main/scala/kafka/api/TopicMetadataResponse.scala =================================================================== --- core/src/main/scala/kafka/api/TopicMetadataResponse.scala (revision 0) +++ core/src/main/scala/kafka/api/TopicMetadataResponse.scala (working copy) @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer + +object TopicMetadataResponse { + + def readFrom(buffer: ByteBuffer): TopicMetadataResponse = { + val versionId = buffer.getShort + val topicCount = buffer.getInt + val topicsMetadata = new Array[TopicMetadata](topicCount) + for( i <- 0 until topicCount) { + topicsMetadata(i) = TopicMetadata.readFrom(buffer) + } + new TopicMetadataResponse(versionId, topicsMetadata.toSeq) + } +} + +case class TopicMetadataResponse(versionId: Short, + topicsMetadata: Seq[TopicMetadata]) extends RequestOrResponse +{ + val sizeInBytes = 2 + topicsMetadata.foldLeft(4)(_ + _.sizeInBytes) + + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + /* topic metadata */ + buffer.putInt(topicsMetadata.length) + topicsMetadata.foreach(_.writeTo(buffer)) + } +} Index: core/src/main/scala/kafka/api/TopicMetadata.scala =================================================================== --- core/src/main/scala/kafka/api/TopicMetadata.scala (revision 1387487) +++ core/src/main/scala/kafka/api/TopicMetadata.scala (working copy) @@ -51,10 +51,6 @@ case object LeaderExists extends LeaderRequest { val requestId: Byte = 1 } case object LeaderDoesNotExist extends LeaderRequest { val requestId: Byte = 0 } -sealed trait LogSegmentMetadataRequest { def requestId: Byte } -case object LogSegmentMetadataExists extends LogSegmentMetadataRequest { val requestId: Byte = 1 } -case object LogSegmentMetadataDoesNotExist extends LogSegmentMetadataRequest { val requestId: Byte = 0 } - object TopicMetadata { def readFrom(buffer: ByteBuffer): TopicMetadata = { @@ -114,28 +110,7 @@ isr(i) = Broker.readFrom(buffer) } - val doesLogMetadataExist = getLogSegmentMetadataRequest(buffer.get) - val logMetadata = doesLogMetadataExist match { - case LogSegmentMetadataExists => - val numLogSegments = getIntInRange(buffer, "total number of log segments", (0, Int.MaxValue)) - val totalDataSize = getLongInRange(buffer, "total data size", (0, Long.MaxValue)) - val numSegmentMetadata = getIntInRange(buffer, "number of log segment metadata", (0, Int.MaxValue)) - val segmentMetadata = numSegmentMetadata match { - case 0 => None - case _ => - val metadata = new ListBuffer[LogSegmentMetadata]() - for(i <- 0 until numSegmentMetadata) { - val beginningOffset = getLongInRange(buffer, "beginning offset", (0, Long.MaxValue)) - val lastModified = getLongInRange(buffer, "last modified time", (0, Long.MaxValue)) - val size = getLongInRange(buffer, "size of log segment", (0, Long.MaxValue)) - metadata += new LogSegmentMetadata(beginningOffset, lastModified, size) - } - Some(metadata) - } - Some(new LogMetadata(numLogSegments, totalDataSize, segmentMetadata)) - case LogSegmentMetadataDoesNotExist => None - } - new PartitionMetadata(partitionId, leader, replicas, isr, errorCode, logMetadata) + new PartitionMetadata(partitionId, leader, replicas, isr, errorCode) } private def getLeaderRequest(requestId: Byte): LeaderRequest = { @@ -145,17 +120,10 @@ case _ => throw new KafkaException("Unknown leader request id " + requestId) } } - - private def getLogSegmentMetadataRequest(requestId: Byte): LogSegmentMetadataRequest = { - requestId match { - case LogSegmentMetadataExists.requestId => LogSegmentMetadataExists - case LogSegmentMetadataDoesNotExist.requestId => LogSegmentMetadataDoesNotExist - } - } } case class PartitionMetadata(partitionId: Int, val leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty, - errorCode: Short = ErrorMapping.NoError, logMetadata: Option[LogMetadata] = None) { + errorCode: Short = ErrorMapping.NoError) { def sizeInBytes: Int = { var size: Int = 2 /* error code */ + 4 /* partition id */ + 1 /* if leader exists*/ @@ -169,11 +137,6 @@ size += 2 /* number of in sync replicas */ size += isr.foldLeft(0)(_ + _.sizeInBytes) - size += 1 /* if log segment metadata exists */ - logMetadata match { - case Some(metadata) => size += metadata.sizeInBytes - case None => - } debug("Size of partition metadata = " + size) size } @@ -198,54 +161,7 @@ /* number of in-sync replicas */ buffer.putShort(isr.size.toShort) isr.foreach(r => r.writeTo(buffer)) - - /* if log segment metadata exists */ - logMetadata match { - case Some(metadata) => - buffer.put(LogSegmentMetadataExists.requestId) - metadata.writeTo(buffer) - case None => buffer.put(LogSegmentMetadataDoesNotExist.requestId) - } - } } -case class LogMetadata(numLogSegments: Int, totalSize: Long, logSegmentMetadata: Option[Seq[LogSegmentMetadata]]) { - def sizeInBytes: Int = { - var size: Int = 4 /* num log segments */ + 8 /* total data size */ + 4 /* number of log segment metadata */ - logSegmentMetadata match { - case Some(segmentMetadata) => size += segmentMetadata.foldLeft(0)(_ + _.sizeInBytes) - case None => - } - debug("Size of log metadata = " + size) - size - } - def writeTo(buffer: ByteBuffer) { - buffer.putInt(numLogSegments) - buffer.putLong(totalSize) - /* if segment metadata exists */ - logSegmentMetadata match { - case Some(segmentMetadata) => - /* number of log segments */ - buffer.putInt(segmentMetadata.size) - segmentMetadata.foreach(m => m.writeTo(buffer)) - case None => - buffer.putInt(0) - } - } -} - -case class LogSegmentMetadata(beginningOffset: Long, lastModified: Long, size: Long) { - def sizeInBytes: Int = { - 8 /* beginning offset */ + 8 /* last modified timestamp */ + 8 /* log segment size in bytes */ - } - - def writeTo(buffer: ByteBuffer) { - buffer.putLong(beginningOffset) - buffer.putLong(lastModified) - buffer.putLong(size) - } -} - - Index: core/src/main/scala/kafka/api/TopicMetadataRequest.scala =================================================================== --- core/src/main/scala/kafka/api/TopicMetadataRequest.scala (revision 1387487) +++ core/src/main/scala/kafka/api/TopicMetadataRequest.scala (working copy) @@ -21,32 +21,15 @@ import kafka.utils.Utils._ import collection.mutable.ListBuffer import kafka.utils._ -import kafka.common.KafkaException -sealed trait DetailedMetadataRequest { def requestId: Short } -case object SegmentMetadata extends DetailedMetadataRequest { val requestId = 1.asInstanceOf[Short] } -case object NoSegmentMetadata extends DetailedMetadataRequest { val requestId = 0.asInstanceOf[Short] } - object TopicMetadataRequest { val CurrentVersion = 1.shortValue() val DefaultClientId = "" /** * TopicMetadataRequest has the following format - - * * number of topics (4 bytes) list of topics (2 bytes + topic.length per topic) detailedMetadata (2 bytes) timestamp (8 bytes) count (4 bytes) - * - * The detailedMetadata field is a placeholder for requesting various details about partition and log metadata - * By default, the value for this field is 0, which means it will just return leader, replica and ISR metadata for - * all partitions of the list of topics mentioned in the request. */ - def getDetailedMetadataRequest(requestId: Short): DetailedMetadataRequest = { - requestId match { - case SegmentMetadata.requestId => SegmentMetadata - case NoSegmentMetadata.requestId => NoSegmentMetadata - case _ => throw new KafkaException("Unknown detailed metadata request id " + requestId) - } - } def readFrom(buffer: ByteBuffer): TopicMetadataRequest = { val versionId = buffer.getShort @@ -56,60 +39,27 @@ for(i <- 0 until numTopics) topics += readShortString(buffer, "UTF-8") val topicsList = topics.toList - val returnDetailedMetadata = getDetailedMetadataRequest(buffer.getShort) - var timestamp: Option[Long] = None - var count: Option[Int] = None - returnDetailedMetadata match { - case NoSegmentMetadata => - case SegmentMetadata => - timestamp = Some(buffer.getLong) - count = Some(buffer.getInt) - case _ => throw new KafkaException("Invalid value for the detailed metadata request " - + returnDetailedMetadata.requestId) - } - debug("topic = %s, detailed metadata request = %d" - .format(topicsList.head, returnDetailedMetadata.requestId)) - new TopicMetadataRequest(versionId, clientId, topics.toList, returnDetailedMetadata, timestamp, count) + debug("topic = %s".format(topicsList.head)) + new TopicMetadataRequest(versionId, clientId, topics.toList) } } case class TopicMetadataRequest(val versionId: Short, val clientId: String, - val topics: Seq[String], - val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata, - val timestamp: Option[Long] = None, val count: Option[Int] = None) + val topics: Seq[String]) extends RequestOrResponse(Some(RequestKeys.MetadataKey)){ def this(topics: Seq[String]) = - this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, NoSegmentMetadata, None, None) + this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics) - - - def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) Utils.writeShortString(buffer, clientId) buffer.putInt(topics.size) topics.foreach(topic => writeShortString(buffer, topic)) - buffer.putShort(detailedMetadata.requestId) - detailedMetadata match { - case SegmentMetadata => - buffer.putLong(timestamp.get) - buffer.putInt(count.get) - case NoSegmentMetadata => - case _ => throw new KafkaException("Invalid value for the detailed metadata request " + detailedMetadata.requestId) - } } def sizeInBytes(): Int = { - var size: Int = 2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ + - 2 /* detailed metadata */ - detailedMetadata match { - case SegmentMetadata => - size += 8 /* timestamp */ + 4 /* count */ - case NoSegmentMetadata => - case _ => throw new KafkaException("Invalid value for the detailed metadata request " + detailedMetadata.requestId) - } - size + 2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ } }