diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index b22ca1d..8a63030 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -23,8 +23,10 @@ import org.apache.kafka.common.protocol.types.Struct; public class MetadataRequest extends AbstractRequestResponse { public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id); + public static final Boolean DEFAULT_CREATE_TOPIC = true; private static String TOPICS_KEY_NAME = "topics"; + private final List topics; public MetadataRequest(List topics) { diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 7dca09c..2abd4ed 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -36,21 +36,34 @@ object TopicMetadataRequest extends Logging { def readFrom(buffer: ByteBuffer): TopicMetadataRequest = { val versionId = buffer.getShort + assert(versionId == 0 || versionId == 1, + "Version " + versionId + " is invalid for TopicMetadataRequest. Valid versions are 0 or 1.") + val correlationId = buffer.getInt + + // version 1 specific fields + var createTopic: Boolean = org.apache.kafka.common.requests.MetadataRequest.DEFAULT_CREATE_TOPIC.booleanValue + if (versionId == 1) { + createTopic = buffer.getShort == 1 + } + val clientId = readShortString(buffer) val numTopics = readIntInRange(buffer, "number of topics", (0, Int.MaxValue)) val topics = new ListBuffer[String]() for(i <- 0 until numTopics) topics += readShortString(buffer) - new TopicMetadataRequest(versionId, correlationId, clientId, topics.toList) + new TopicMetadataRequest(versionId, correlationId, clientId, topics.toList, createTopic) } } case class TopicMetadataRequest(val versionId: Short, val correlationId: Int, val clientId: String, - val topics: Seq[String]) + val topics: Seq[String], + val createTopic: Boolean = org.apache.kafka.common.requests.MetadataRequest.DEFAULT_CREATE_TOPIC.booleanValue) extends RequestOrResponse(Some(RequestKeys.MetadataKey)){ + assert(versionId == 0 || versionId == 1, + "Version " + versionId + " is invalid for TopicMetadataRequest. Valid versions are 0 or 1.") def this(topics: Seq[String], correlationId: Int) = this(TopicMetadataRequest.CurrentVersion, correlationId, TopicMetadataRequest.DefaultClientId, topics) @@ -58,6 +71,11 @@ case class TopicMetadataRequest(val versionId: Short, def writeTo(buffer: ByteBuffer) { buffer.putShort(versionId) buffer.putInt(correlationId) + + if(versionId == 1) { + if (createTopic) buffer.putShort(1.shortValue) else buffer.putShort(0.shortValue) + } + writeShortString(buffer, clientId) buffer.putInt(topics.size) topics.foreach(topic => writeShortString(buffer, topic)) @@ -66,6 +84,7 @@ case class TopicMetadataRequest(val versionId: Short, def sizeInBytes(): Int = { 2 + /* version id */ 4 + /* correlation id */ + (if (versionId == 1) 2 else 0) + /* createTopic */ shortStringLength(clientId) + /* client id */ 4 + /* number of topics */ topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ @@ -89,8 +108,9 @@ case class TopicMetadataRequest(val versionId: Short, topicMetadataRequest.append("; Version: " + versionId) topicMetadataRequest.append("; CorrelationId: " + correlationId) topicMetadataRequest.append("; ClientId: " + clientId) + topicMetadataRequest.append("; CreateTopic: " + createTopic) if(details) topicMetadataRequest.append("; Topics: " + topics.mkString(",")) topicMetadataRequest.toString() } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index ce7ede3..cf03cf3 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -41,10 +41,11 @@ object ClientUtils extends Logging{ * @param producerConfig The producer's config * @return topic metadata response */ - def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { + def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, + correlationId: Int, versionId: Short = 0, createTopic: Boolean = true): TopicMetadataResponse = { var fetchMetaDataSucceeded: Boolean = false var i: Int = 0 - val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq) + val topicMetadataRequest = new TopicMetadataRequest(versionId, correlationId, producerConfig.clientId, topics.toSeq, createTopic) var topicMetadataResponse: TopicMetadataResponse = null var t: Throwable = null // shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the @@ -82,18 +83,18 @@ object ClientUtils extends Logging{ * @param clientId The client's identifier * @return topic metadata response */ - def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int, - correlationId: Int = 0): TopicMetadataResponse = { + def fetchTopicMetadataForConsumer(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int, + correlationId: Int = 0, versionId: Short = 1, createTopic: Boolean = false): TopicMetadataResponse = { val props = new Properties() props.put("metadata.broker.list", brokers.map(_.getConnectionString()).mkString(",")) props.put("client.id", clientId) props.put("request.timeout.ms", timeoutMs.toString) val producerConfig = new ProducerConfig(props) - fetchTopicMetadata(topics, brokers, producerConfig, correlationId) + fetchTopicMetadata(topics, brokers, producerConfig, correlationId, versionId, createTopic) } /** - * Parse a list of broker urls in the form host1:port1, host2:port2, ... + * Parse a list of broker urls in the form host1:port1, host2:port2, ... */ def parseBrokerList(brokerListStr: String): Seq[Broker] = { val brokersStr = Utils.parseCsvList(brokerListStr) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index b9e2bea..f40cb2f 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -63,7 +63,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, trace("Partitions without leader %s".format(noLeaderPartitionSet)) val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, + val topicsMetadata = ClientUtils.fetchTopicMetadataForConsumer(noLeaderPartitionSet.map(m => m.topic).toSet, brokers, config.clientId, config.socketTimeoutMs, @@ -163,4 +163,4 @@ class ConsumerFetcherManager(private val consumerIdString: String, } } } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index b0b7be1..0790dd4 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -26,12 +26,13 @@ import kafka.network.RequestChannel.Response class TopicMetadataRequest(val versionId: Short, val correlationId: Int, val clientId: String, - val topics: java.util.List[String]) + val topics: java.util.List[String], + val createTopic: Boolean = true) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) { val underlying: kafka.api.TopicMetadataRequest = { import scala.collection.JavaConversions._ - new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String]) + new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String], createTopic) } def this(topics: java.util.List[String]) = @@ -54,6 +55,7 @@ class TopicMetadataRequest(val versionId: Short, topicMetadataRequest.append("; Version: " + versionId) topicMetadataRequest.append("; CorrelationId: " + correlationId) topicMetadataRequest.append("; ClientId: " + clientId) + topicMetadataRequest.append("; createTopic: "+ createTopic) if(details) { topicMetadataRequest.append("; Topics: ") val topicIterator = topics.iterator() diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index fd5f12e..f357058 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -268,11 +268,11 @@ class KafkaApis(val requestChannel: RequestChannel, "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format( acksPending, status.error, status.offset, requiredOffset) } - + case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) { - def this(key: TopicAndPartition, throwable: Throwable) = + def this(key: TopicAndPartition, throwable: Throwable) = this(key, -1L, -1L, Some(throwable)) - + def errorCode = error match { case None => ErrorMapping.NoError case Some(error) => ErrorMapping.codeFor(error.getClass.asInstanceOf[Class[Throwable]]) @@ -426,10 +426,10 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Read from a single topic/partition at the given offset upto maxSize bytes */ - private def readMessageSet(topic: String, - partition: Int, + private def readMessageSet(topic: String, + partition: Int, offset: Long, - maxSize: Int, + maxSize: Int, fromReplicaId: Int): (MessageSet, Long) = { // check if the current broker is the leader for the partitions val localReplica = if(fromReplicaId == Request.DebuggingConsumerId) @@ -437,7 +437,7 @@ class KafkaApis(val requestChannel: RequestChannel, else replicaManager.getLeaderReplicaIfLocal(topic, partition) trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) - val maxOffsetOpt = + val maxOffsetOpt = if (Request.isValidBrokerId(fromReplicaId)) None else @@ -453,7 +453,7 @@ class KafkaApis(val requestChannel: RequestChannel, } /** - * Service the offset request API + * Service the offset request API */ def handleOffsetRequest(request: RequestChannel.Request) { val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest] @@ -476,7 +476,7 @@ class KafkaApis(val requestChannel: RequestChannel, val hw = localReplica.highWatermark if (allOffsets.exists(_ > hw)) hw +: allOffsets.dropWhile(_ > hw) - else + else allOffsets } } @@ -500,19 +500,19 @@ class KafkaApis(val requestChannel: RequestChannel, val response = OffsetResponse(offsetRequest.correlationId, responseMap) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - + def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { logManager.getLog(topicAndPartition) match { - case Some(log) => + case Some(log) => fetchOffsetsBefore(log, timestamp, maxNumOffsets) - case None => + case None => if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime) Seq(0L) else Nil } } - + def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { val segsArray = log.logSegments.toArray var offsetTimeArray: Array[(Long, Long)] = null @@ -554,12 +554,12 @@ class KafkaApis(val requestChannel: RequestChannel, ret.toSeq.sortBy(- _) } - private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = { + private def getTopicMetadata(topics: Set[String], createTopic: Boolean = true): Seq[TopicMetadata] = { val topicResponses = metadataCache.getTopicMetadata(topics) if (topics.size > 0 && topicResponses.size != topics.size) { val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet val responsesForNonExistentTopics = nonExistentTopics.map { topic => - if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) { + if (topic == OffsetManager.OffsetsTopicName || (config.autoCreateTopicsEnable && createTopic)) { try { if (topic == OffsetManager.OffsetsTopicName) { AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, @@ -590,7 +590,7 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] - val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet) + val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet, metadataRequest.createTopic) val brokers = metadataCache.getAliveBrokers trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId) @@ -843,9 +843,8 @@ class KafkaApis(val requestChannel: RequestChannel, def recordDelayedFetchExpired(forFollower: Boolean) { val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics else aggregateNonFollowerFetchRequestMetrics - + metrics.expiredRequestMeter.mark() } } } - diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala index 8c5b054..7af2f43 100644 --- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala +++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala @@ -34,8 +34,7 @@ class OffsetCheckpoint(val file: File) extends Logging { // write to temp file and then swap with the existing file val temp = new File(file.getAbsolutePath + ".tmp") - val fileOutputStream = new FileOutputStream(temp) - val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream)) + val writer = new BufferedWriter(new FileWriter(temp)) try { // write the current version writer.write(0.toString) @@ -51,9 +50,8 @@ class OffsetCheckpoint(val file: File) extends Logging { writer.newLine() } - // flush the buffer and then fsync the underlying file + // flush and overwrite old file writer.flush() - fileOutputStream.getFD().sync() } finally { writer.close() } diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 9c6064e..220f594 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -57,7 +57,7 @@ object GetOffsetShell { .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) - + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.") @@ -73,7 +73,7 @@ object GetOffsetShell { val nOffsets = options.valueOf(nOffsetsOpt).intValue val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() - val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata + val topicsMetadata = ClientUtils.fetchTopicMetadataForConsumer(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) + "kafka-list-topic.sh to verify") diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index af47836..29256de 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -92,7 +92,7 @@ object ReplicaVerificationTool extends Logging { .describedAs("ms") .ofType(classOf[java.lang.Long]) .defaultsTo(30 * 1000L) - + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.") @@ -117,7 +117,7 @@ object ReplicaVerificationTool extends Logging { // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) - val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) + val topicsMetadataResponse = ClientUtils.fetchTopicMetadataForConsumer(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter( topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) @@ -395,4 +395,4 @@ private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartiti verificationBarrier.await() debug("Done verification") } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 36314f4..9d0fa88 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -93,7 +93,7 @@ object SimpleConsumerShell extends Logging { "skip it instead of halt.") val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend", "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages") - + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.") @@ -125,7 +125,7 @@ object SimpleConsumerShell extends Logging { // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) - val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata + val topicsMetadata = ClientUtils.fetchTopicMetadataForConsumer(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata)) System.exit(1) diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 1bf2667..eb9dd39 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -109,7 +109,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { // read metadata from a broker and verify the new topic partitions exist TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1) TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions", + val metadata = ClientUtils.fetchTopicMetadataForConsumer(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions", 2000,0).topicsMetadata val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1)) val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata @@ -134,7 +134,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { // read metadata from a broker and verify the new topic partitions exist TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1) TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas", + val metadata = ClientUtils.fetchTopicMetadataForConsumer(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas", 2000,0).topicsMetadata val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2)) val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata @@ -158,7 +158,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5) TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement", + val metadata = ClientUtils.fetchTopicMetadataForConsumer(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement", 2000,0).topicsMetadata val metaDataForTopic3 = metadata.filter(p => p.topic.equals(topic3)).head diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 35dc071..3fd74ad 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -67,7 +67,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val topic = "test" createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) - var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", + var topicsMetadata = ClientUtils.fetchTopicMetadataForConsumer(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", 2000,0).topicsMetadata assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode) @@ -87,7 +87,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) // issue metadata request with empty list of topics - var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokers, "TopicMetadataTest-testGetAllTopicMetadata", + var topicsMetadata = ClientUtils.fetchTopicMetadataForConsumer(Set.empty, brokers, "TopicMetadataTest-testGetAllTopicMetadata", 2000, 0).topicsMetadata assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) assertEquals(2, topicsMetadata.size) @@ -106,8 +106,8 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { def testAutoCreateTopic { // auto create topic val topic = "testAutoCreateTopic" - var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testAutoCreateTopic", - 2000,0).topicsMetadata + var topicsMetadata = ClientUtils.fetchTopicMetadataForConsumer(Set(topic),brokers,"TopicMetadataTest-testAutoCreateTopic", + 2000,0,1,true).topicsMetadata assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode) assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size) assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic) @@ -118,7 +118,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0) // retry the metadata for the auto created topic - topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", + topicsMetadata = ClientUtils.fetchTopicMetadataForConsumer(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", 2000,0).topicsMetadata assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode)