diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index a319f2f..c18bc43 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -42,14 +42,20 @@ object TopicMetadataRequest extends Logging { val topics = new ListBuffer[String]() for(i <- 0 until numTopics) topics += readShortString(buffer) - new TopicMetadataRequest(versionId, correlationId, clientId, topics.toList) + if (buffer.hasRemaining) { + val createTopic = if (buffer.getShort == 1) true else false + new TopicMetadataRequest(versionId, correlationId, clientId, topics.toList, createTopic) + } else { + new TopicMetadataRequest(versionId, correlationId, clientId, topics.toList) + } } } case class TopicMetadataRequest(val versionId: Short, override val correlationId: Int, val clientId: String, - val topics: Seq[String]) + val topics: Seq[String], + val createTopic: Boolean = true) extends RequestOrResponse(Some(RequestKeys.MetadataKey), correlationId){ def this(topics: Seq[String], correlationId: Int) = @@ -61,6 +67,7 @@ case class TopicMetadataRequest(val versionId: Short, writeShortString(buffer, clientId) buffer.putInt(topics.size) topics.foreach(topic => writeShortString(buffer, topic)) + if(createTopic) buffer.putShort(1.shortValue) else buffer.putShort(0.shortValue) } def sizeInBytes(): Int = { @@ -68,7 +75,8 @@ case class TopicMetadataRequest(val versionId: Short, 4 + /* correlation id */ shortStringLength(clientId) + /* client id */ 4 + /* number of topics */ - topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) + /* topics */ + 2 /* createTopic */ } override def toString(): String = { @@ -88,9 +96,10 @@ case class TopicMetadataRequest(val versionId: Short, topicMetadataRequest.append("Name: " + this.getClass.getSimpleName) topicMetadataRequest.append("; Version: " + versionId) topicMetadataRequest.append("; CorrelationId: " + correlationId) + topicMetadataRequest.append("; CreateTopic: " + createTopic) topicMetadataRequest.append("; ClientId: " + clientId) if(details) topicMetadataRequest.append("; Topics: " + topics.mkString(",")) topicMetadataRequest.toString() } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index ce7ede3..d2eaef2 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -41,10 +41,11 @@ object ClientUtils extends Logging{ * @param producerConfig The producer's config * @return topic metadata response */ - def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { + def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int, + createTopic: Boolean): TopicMetadataResponse = { var fetchMetaDataSucceeded: Boolean = false var i: Int = 0 - val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq) + val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq, createTopic) var topicMetadataResponse: TopicMetadataResponse = null var t: Throwable = null // shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the @@ -83,17 +84,17 @@ object ClientUtils extends Logging{ * @return topic metadata response */ def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int, - correlationId: Int = 0): TopicMetadataResponse = { + correlationId: Int = 0, createTopic: Boolean = true): TopicMetadataResponse = { val props = new Properties() props.put("metadata.broker.list", brokers.map(_.getConnectionString()).mkString(",")) props.put("client.id", clientId) props.put("request.timeout.ms", timeoutMs.toString) val producerConfig = new ProducerConfig(props) - fetchTopicMetadata(topics, brokers, producerConfig, correlationId) + fetchTopicMetadata(topics, brokers, producerConfig, correlationId, createTopic) } /** - * Parse a list of broker urls in the form host1:port1, host2:port2, ... + * Parse a list of broker urls in the form host1:port1, host2:port2, ... */ def parseBrokerList(brokerListStr: String): Seq[Broker] = { val brokersStr = Utils.parseCsvList(brokerListStr) diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index 7e6da16..c6b876c 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -26,12 +26,13 @@ import kafka.network.RequestChannel.Response class TopicMetadataRequest(val versionId: Short, override val correlationId: Int, val clientId: String, - val topics: java.util.List[String]) + val topics: java.util.List[String], + val createTopic: Boolean = true) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey), correlationId) { val underlying: kafka.api.TopicMetadataRequest = { import scala.collection.JavaConversions._ - new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String]) + new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String], createTopic) } def this(topics: java.util.List[String]) = @@ -53,6 +54,7 @@ class TopicMetadataRequest(val versionId: Short, topicMetadataRequest.append("Name: " + this.getClass.getSimpleName) topicMetadataRequest.append("; Version: " + versionId) topicMetadataRequest.append("; CorrelationId: " + correlationId) + topicMetadataRequest.append("; CreateTopic: " + createTopic) topicMetadataRequest.append("; ClientId: " + clientId) if(details) { topicMetadataRequest.append("; Topics: ") diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index 13a8aa6..8831da6 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -79,7 +79,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, */ def updateInfo(topics: Set[String], correlationId: Int) { var topicsMetadata: Seq[TopicMetadata] = Nil - val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId) + val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId, true) topicsMetadata = topicMetadataResponse.topicsMetadata // throw partition specific exception topicsMetadata.foreach(tmd =>{ @@ -97,7 +97,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, }) producerPool.updateProducer(topicsMetadata) } - + } case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int]) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 0b668f2..74d80e0 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -268,11 +268,11 @@ class KafkaApis(val requestChannel: RequestChannel, "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format( acksPending, status.error, status.offset, requiredOffset) } - + case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) { - def this(key: TopicAndPartition, throwable: Throwable) = + def this(key: TopicAndPartition, throwable: Throwable) = this(key, -1L, -1L, Some(throwable)) - + def errorCode = error match { case None => ErrorMapping.NoError case Some(error) => ErrorMapping.codeFor(error.getClass.asInstanceOf[Class[Throwable]]) @@ -426,10 +426,10 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Read from a single topic/partition at the given offset upto maxSize bytes */ - private def readMessageSet(topic: String, - partition: Int, + private def readMessageSet(topic: String, + partition: Int, offset: Long, - maxSize: Int, + maxSize: Int, fromReplicaId: Int): (MessageSet, Long) = { // check if the current broker is the leader for the partitions val localReplica = if(fromReplicaId == Request.DebuggingConsumerId) @@ -437,7 +437,7 @@ class KafkaApis(val requestChannel: RequestChannel, else replicaManager.getLeaderReplicaIfLocal(topic, partition) trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) - val maxOffsetOpt = + val maxOffsetOpt = if (Request.isValidBrokerId(fromReplicaId)) None else @@ -453,7 +453,7 @@ class KafkaApis(val requestChannel: RequestChannel, } /** - * Service the offset request API + * Service the offset request API */ def handleOffsetRequest(request: RequestChannel.Request) { val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest] @@ -476,7 +476,7 @@ class KafkaApis(val requestChannel: RequestChannel, val hw = localReplica.highWatermark if (allOffsets.exists(_ > hw)) hw +: allOffsets.dropWhile(_ > hw) - else + else allOffsets } } @@ -500,19 +500,19 @@ class KafkaApis(val requestChannel: RequestChannel, val response = OffsetResponse(offsetRequest.correlationId, responseMap) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - + def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { logManager.getLog(topicAndPartition) match { - case Some(log) => + case Some(log) => fetchOffsetsBefore(log, timestamp, maxNumOffsets) - case None => + case None => if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime) Seq(0L) else Nil } } - + def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { val segsArray = log.logSegments.toArray var offsetTimeArray: Array[(Long, Long)] = null @@ -554,12 +554,12 @@ class KafkaApis(val requestChannel: RequestChannel, ret.toSeq.sortBy(- _) } - private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = { + private def getTopicMetadata(topics: Set[String], createTopic: Boolean = true ): Seq[TopicMetadata] = { val topicResponses = metadataCache.getTopicMetadata(topics) if (topics.size > 0 && topicResponses.size != topics.size) { val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet val responsesForNonExistentTopics = nonExistentTopics.map { topic => - if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) { + if (topic == OffsetManager.OffsetsTopicName || (config.autoCreateTopicsEnable && createTopic)) { try { if (topic == OffsetManager.OffsetsTopicName) { AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, @@ -590,7 +590,7 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] - val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet) + val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet, metadataRequest.createTopic) trace("Sending topic metadata %s for correlation id %d to client %s".format(topicMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) val response = new TopicMetadataResponse(topicMetadata, metadataRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) @@ -842,9 +842,8 @@ class KafkaApis(val requestChannel: RequestChannel, def recordDelayedFetchExpired(forFollower: Boolean) { val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics else aggregateNonFollowerFetchRequestMetrics - + metrics.expiredRequestMeter.mark() } } } - diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 9c6064e..ebb8b89 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -57,7 +57,7 @@ object GetOffsetShell { .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) - + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.") @@ -73,8 +73,9 @@ object GetOffsetShell { val nOffsets = options.valueOf(nOffsetsOpt).intValue val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() - val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata - if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { + val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs, + createTopic = false).topicsMetadata + if(topicsMetadata.size != 1 || topicsMetadata(0).errorCode == 3 ) { System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) + "kafka-list-topic.sh to verify") System.exit(1) diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 36314f4..0d511c8 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -93,7 +93,7 @@ object SimpleConsumerShell extends Logging { "skip it instead of halt.") val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend", "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages") - + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.") @@ -125,8 +125,8 @@ object SimpleConsumerShell extends Logging { // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) - val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata - if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { + val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs, createTopic = false).topicsMetadata + if(topicsMetadata.size != 1 || topicsMetadata(0).errorCode == 3) { System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata)) System.exit(1) } diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 35dc071..15fd853 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -88,7 +88,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { // issue metadata request with empty list of topics var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokers, "TopicMetadataTest-testGetAllTopicMetadata", - 2000, 0).topicsMetadata + 2000, correlationId=0).topicsMetadata assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) assertEquals(2, topicsMetadata.size) assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)