diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index eea270a..5108bab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -77,9 +77,6 @@ public class NetworkClient implements KafkaClient { /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ private boolean metadataFetchInProgress; - /* the last timestamp when no broker node is available to connect */ - private long lastNoNodeAvailableMs; - public NetworkClient(Selectable selector, Metadata metadata, String clientId, @@ -97,7 +94,6 @@ public class NetworkClient implements KafkaClient { this.correlation = 0; this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE); this.metadataFetchInProgress = false; - this.lastNoNodeAvailableMs = 0; } /** @@ -166,10 +162,7 @@ public class NetworkClient implements KafkaClient { } // should we update our metadata? - long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); - long timeToNextReconnectAttempt = this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now; - // if there is no node available to connect, back off refreshing metadata - long metadataTimeout = Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt); + long metadataTimeout = metadata.timeToNextUpdate(now); if (!this.metadataFetchInProgress && metadataTimeout == 0) maybeUpdateMetadata(sends, now); @@ -348,8 +341,8 @@ public class NetworkClient implements KafkaClient { /** * Create a metadata request for the given topics */ - private ClientRequest metadataRequest(long now, int node, Set topics) { - MetadataRequest metadata = new MetadataRequest(new ArrayList(topics)); + private ClientRequest metadataRequest(long now, int node, Set topics, Boolean createTopic) { + MetadataRequest metadata = new MetadataRequest(new ArrayList(topics), createTopic); RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct()); return new ClientRequest(now, true, send, null); } @@ -361,8 +354,6 @@ public class NetworkClient implements KafkaClient { Node node = this.leastLoadedNode(now); if (node == null) { log.debug("Give up sending metadata request since no node is available"); - // mark the timestamp for no node available to connect - this.lastNoNodeAvailableMs = now; return; } @@ -370,13 +361,13 @@ public class NetworkClient implements KafkaClient { if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { Set topics = metadata.topics(); this.metadataFetchInProgress = true; - ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); + ClientRequest metadataRequest = metadataRequest(now, node.id(), topics, metadata.createTopic()); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); sends.add(metadataRequest.request()); this.inFlightRequests.add(metadataRequest); } else if (connectionStates.canConnect(node.id(), now)) { // we don't have a connection to this node right now, make one - log.debug("Init connection to node {} for sending metadata request in the next iteration", node.id()); + log.debug("Give up sending metadata request to node {} since it is either not connected or cannot have more in flight requests", node.id()); initiateConnect(node, now); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 1d30f9e..5f4aa8a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -3,9 +3,9 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. @@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory; * A class encapsulating some of the logic around metadata. *

* This class is shared by the client thread (for partitioning) and the background sender thread. - * + * * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metdata for a * topic we don't have any metadata for it will trigger a metadata update. */ @@ -39,7 +39,7 @@ public final class Metadata { private Cluster cluster; private boolean needUpdate; private final Set topics; - + private boolean createTopic; /** * Create a metadata instance with reasonable defaults */ @@ -54,6 +54,17 @@ public final class Metadata { * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh */ public Metadata(long refreshBackoffMs, long metadataExpireMs) { + this(refreshBackoffMs,metadataExpireMs,true); + } + + /** + * Create a new Metadata instance + * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy + * polling + * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh + * @param createTopic creation of topics if they don't exist + */ + public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean createTopic) { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; this.lastRefreshMs = 0L; @@ -61,8 +72,9 @@ public final class Metadata { this.cluster = Cluster.empty(); this.needUpdate = false; this.topics = new HashSet(); + this.createTopic = true; + this.createTopic = createTopic; } - /** * Get the current cluster info without blocking */ @@ -121,6 +133,10 @@ public final class Metadata { return new HashSet(this.topics); } + public boolean createTopic() { + return createTopic; + } + /** * Update the cluster metadata */ @@ -139,11 +155,4 @@ public final class Metadata { public synchronized long lastUpdate() { return this.lastRefreshMs; } - - /** - * The metadata refresh backoff in ms - */ - public long refreshBackoff() { - return refreshBackoffMs; - } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 7517b87..761fc67 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.protocol; import static org.apache.kafka.common.protocol.types.Type.BYTES; +import static org.apache.kafka.common.protocol.types.Type.INT8; import static org.apache.kafka.common.protocol.types.Type.INT16; import static org.apache.kafka.common.protocol.types.Type.INT32; import static org.apache.kafka.common.protocol.types.Type.INT64; @@ -47,6 +48,11 @@ public class Protocol { new ArrayOf(STRING), "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics.")); + public static Schema METADATA_REQUEST_V1 = new Schema(new Field("create_topic", INT8, "override the creation of topic if it doesn't exist."), + new Field("topics", + new ArrayOf(STRING), + "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics.")); + public static Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."), new Field("host", STRING, "The hostname of the broker."), new Field("port", INT32, "The port on which the broker accepts requests.")); @@ -76,8 +82,8 @@ public class Protocol { "Host and port information for all brokers."), new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V0))); - public static Schema[] METADATA_REQUEST = new Schema[] { METADATA_REQUEST_V0 }; - public static Schema[] METADATA_RESPONSE = new Schema[] { METADATA_RESPONSE_V0 }; + public static Schema[] METADATA_REQUEST = new Schema[] { METADATA_REQUEST_V0, METADATA_REQUEST_V1 }; + public static Schema[] METADATA_RESPONSE = new Schema[] { METADATA_RESPONSE_V0, METADATA_RESPONSE_V0 }; /* Produce api */ diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 444e69e..a6505ae 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -45,7 +45,7 @@ public class Struct { /** * Return the value of the given pre-validated field, or if the value is missing return the default value. - * + * * @param field The field for which to get the default value * @throws SchemaException if the field has no value and has no default. */ @@ -61,7 +61,7 @@ public class Struct { /** * Get the value for the field directly by the field index with no lookup needed (faster!) - * + * * @param field The field to look up * @return The value for that field. */ @@ -72,7 +72,7 @@ public class Struct { /** * Get the record value for the field with the given name by doing a hash table lookup (slower!) - * + * * @param name The name of the field * @return The value in the record */ @@ -100,6 +100,14 @@ public class Struct { return (Struct) get(name); } + public Byte getByte(Field field) { + return (Byte) get(field); + } + + public Byte getByte(String field) { + return (Byte) get(field); + } + public Short getShort(Field field) { return (Short) get(field); } @@ -150,7 +158,7 @@ public class Struct { /** * Set the given field to the specified value - * + * * @param field The field * @param value The value */ @@ -162,7 +170,7 @@ public class Struct { /** * Set the field specified by the given name to the value - * + * * @param name The name of the field * @param value The value to set */ @@ -178,7 +186,7 @@ public class Struct { * Create a struct for the schema of a container type (struct or array). * Note that for array type, this method assumes that the type is an array of schema and creates a struct * of that schema. Arrays of other types can't be instantiated with this method. - * + * * @param field The field to create an instance of * @return The struct */ @@ -196,7 +204,7 @@ public class Struct { /** * Create a struct instance for the given field which must be a container type (struct or array) - * + * * @param field The name of the field to create (field must be a schema type) * @return The struct */ diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index b22ca1d..330726d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -23,14 +23,34 @@ import org.apache.kafka.common.protocol.types.Struct; public class MetadataRequest extends AbstractRequestResponse { public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id); + public static final Boolean DEFAULT_CREATE_TOPIC = true; private static String TOPICS_KEY_NAME = "topics"; - + private static String CREATE_TOPIC_KEY_NAME = "create_topic"; + private final Boolean createTopic; private final List topics; + /** + * Constructor for version 0. + * @param topics + */ public MetadataRequest(List topics) { + super(new Struct(ProtoUtils.requestSchema(ApiKeys.METADATA.id, 0))); + struct.set(TOPICS_KEY_NAME, topics.toArray()); + this.topics = topics; + this.createTopic = DEFAULT_CREATE_TOPIC; + } + + /** + * Constructor for version 1. + * @param topics + * @param createTopic + */ + public MetadataRequest(List topics, Boolean createTopic) { super(new Struct(curSchema)); struct.set(TOPICS_KEY_NAME, topics.toArray()); + struct.set(CREATE_TOPIC_KEY_NAME, (byte) (createTopic ? 1 : 0)); this.topics = topics; + this.createTopic = createTopic; } public MetadataRequest(Struct struct) { @@ -40,13 +60,25 @@ public class MetadataRequest extends AbstractRequestResponse { for (Object topicObj: topicArray) { topics.add((String) topicObj); } + // This field only exists in v1. + if(struct.hasField(CREATE_TOPIC_KEY_NAME)) + createTopic = struct.getByte(CREATE_TOPIC_KEY_NAME) == 1 ? true : false; + else + createTopic = DEFAULT_CREATE_TOPIC; + } public List topics() { return topics; } + public static MetadataRequest parse(ByteBuffer buffer, int versionId) { + Schema schema = ProtoUtils.requestSchema(ApiKeys.METADATA.id, versionId); + return new MetadataRequest(((Struct) schema.read(buffer))); + } + public static MetadataRequest parse(ByteBuffer buffer) { + System.out.println("in parse ByteBuffer"); return new MetadataRequest(((Struct) curSchema.read(buffer))); } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index df37fc6..b372a05 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -124,7 +124,7 @@ public class RequestResponseTest { } private AbstractRequestResponse createMetadataRequest() { - return new MetadataRequest(Arrays.asList("topic1")); + return new MetadataRequest(Arrays.asList("topic1"), true); } private AbstractRequestResponse createMetadataResponse() { diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index 7dca09c..b0c0c43 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -26,7 +26,7 @@ import kafka.network.RequestChannel.Response import kafka.utils.Logging object TopicMetadataRequest extends Logging { - val CurrentVersion = 0.shortValue + val CurrentVersion = 1.shortValue val DefaultClientId = "" /** @@ -36,21 +36,34 @@ object TopicMetadataRequest extends Logging { def readFrom(buffer: ByteBuffer): TopicMetadataRequest = { val versionId = buffer.getShort + assert(versionId == 0 || versionId == 1, + "Version " + versionId + " is invalid for TopicMetadataRequest. Valid versions are 0 or 1.") + val correlationId = buffer.getInt val clientId = readShortString(buffer) + + // version 1 specific fields + var createTopic: Boolean = org.apache.kafka.common.requests.MetadataRequest.DEFAULT_CREATE_TOPIC.booleanValue + if (versionId == 1) { + createTopic = buffer.get == 1 + } + val numTopics = readIntInRange(buffer, "number of topics", (0, Int.MaxValue)) val topics = new ListBuffer[String]() for(i <- 0 until numTopics) topics += readShortString(buffer) - new TopicMetadataRequest(versionId, correlationId, clientId, topics.toList) + new TopicMetadataRequest(versionId, correlationId, clientId, topics.toList, createTopic) } } case class TopicMetadataRequest(val versionId: Short, val correlationId: Int, val clientId: String, - val topics: Seq[String]) + val topics: Seq[String], + val createTopic: Boolean = org.apache.kafka.common.requests.MetadataRequest.DEFAULT_CREATE_TOPIC.booleanValue) extends RequestOrResponse(Some(RequestKeys.MetadataKey)){ + assert(versionId == 0 || versionId == 1, + "Version " + versionId + " is invalid for TopicMetadataRequest. Valid versions are 0 or 1.") def this(topics: Seq[String], correlationId: Int) = this(TopicMetadataRequest.CurrentVersion, correlationId, TopicMetadataRequest.DefaultClientId, topics) @@ -59,6 +72,11 @@ case class TopicMetadataRequest(val versionId: Short, buffer.putShort(versionId) buffer.putInt(correlationId) writeShortString(buffer, clientId) + + if(versionId == 1) { + if (createTopic) buffer.put(1.byteValue) else buffer.put(0.byteValue) + } + buffer.putInt(topics.size) topics.foreach(topic => writeShortString(buffer, topic)) } @@ -67,6 +85,7 @@ case class TopicMetadataRequest(val versionId: Short, 2 + /* version id */ 4 + /* correlation id */ shortStringLength(clientId) + /* client id */ + (if (versionId == 1) 1 else 0) + /* createTopic */ 4 + /* number of topics */ topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ } @@ -89,8 +108,9 @@ case class TopicMetadataRequest(val versionId: Short, topicMetadataRequest.append("; Version: " + versionId) topicMetadataRequest.append("; CorrelationId: " + correlationId) topicMetadataRequest.append("; ClientId: " + clientId) + topicMetadataRequest.append("; CreateTopic: " + createTopic) if(details) topicMetadataRequest.append("; Topics: " + topics.mkString(",")) topicMetadataRequest.toString() } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index ce7ede3..bea9ab7 100644 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -41,10 +41,11 @@ object ClientUtils extends Logging{ * @param producerConfig The producer's config * @return topic metadata response */ - def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { + def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, + correlationId: Int, createTopic: Boolean = true): TopicMetadataResponse = { var fetchMetaDataSucceeded: Boolean = false var i: Int = 0 - val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq) + val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq, createTopic) var topicMetadataResponse: TopicMetadataResponse = null var t: Throwable = null // shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the @@ -82,18 +83,18 @@ object ClientUtils extends Logging{ * @param clientId The client's identifier * @return topic metadata response */ - def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int, - correlationId: Int = 0): TopicMetadataResponse = { + def fetchTopicMetadataForNonProducer(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs: Int, + correlationId: Int = 0, createTopic: Boolean = false): TopicMetadataResponse = { val props = new Properties() props.put("metadata.broker.list", brokers.map(_.getConnectionString()).mkString(",")) props.put("client.id", clientId) props.put("request.timeout.ms", timeoutMs.toString) val producerConfig = new ProducerConfig(props) - fetchTopicMetadata(topics, brokers, producerConfig, correlationId) + fetchTopicMetadata(topics, brokers, producerConfig, correlationId, createTopic) } /** - * Parse a list of broker urls in the form host1:port1, host2:port2, ... + * Parse a list of broker urls in the form host1:port1, host2:port2, ... */ def parseBrokerList(brokerListStr: String): Seq[Broker] = { val brokersStr = Utils.parseCsvList(brokerListStr) diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index b9e2bea..54a56b0 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -63,7 +63,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, trace("Partitions without leader %s".format(noLeaderPartitionSet)) val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, + val topicsMetadata = ClientUtils.fetchTopicMetadataForNonProducer(noLeaderPartitionSet.map(m => m.topic).toSet, brokers, config.clientId, config.socketTimeoutMs, @@ -163,4 +163,4 @@ class ConsumerFetcherManager(private val consumerIdString: String, } } } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala index b0b7be1..0790dd4 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala @@ -26,12 +26,13 @@ import kafka.network.RequestChannel.Response class TopicMetadataRequest(val versionId: Short, val correlationId: Int, val clientId: String, - val topics: java.util.List[String]) + val topics: java.util.List[String], + val createTopic: Boolean = true) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) { val underlying: kafka.api.TopicMetadataRequest = { import scala.collection.JavaConversions._ - new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String]) + new kafka.api.TopicMetadataRequest(versionId, correlationId, clientId, topics: mutable.Buffer[String], createTopic) } def this(topics: java.util.List[String]) = @@ -54,6 +55,7 @@ class TopicMetadataRequest(val versionId: Short, topicMetadataRequest.append("; Version: " + versionId) topicMetadataRequest.append("; CorrelationId: " + correlationId) topicMetadataRequest.append("; ClientId: " + clientId) + topicMetadataRequest.append("; createTopic: "+ createTopic) if(details) { topicMetadataRequest.append("; Topics: ") val topicIterator = topics.iterator() diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index fd5f12e..f357058 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -268,11 +268,11 @@ class KafkaApis(val requestChannel: RequestChannel, "acksPending:%b, error: %d, startOffset: %d, requiredOffset: %d".format( acksPending, status.error, status.offset, requiredOffset) } - + case class ProduceResult(key: TopicAndPartition, start: Long, end: Long, error: Option[Throwable] = None) { - def this(key: TopicAndPartition, throwable: Throwable) = + def this(key: TopicAndPartition, throwable: Throwable) = this(key, -1L, -1L, Some(throwable)) - + def errorCode = error match { case None => ErrorMapping.NoError case Some(error) => ErrorMapping.codeFor(error.getClass.asInstanceOf[Class[Throwable]]) @@ -426,10 +426,10 @@ class KafkaApis(val requestChannel: RequestChannel, /** * Read from a single topic/partition at the given offset upto maxSize bytes */ - private def readMessageSet(topic: String, - partition: Int, + private def readMessageSet(topic: String, + partition: Int, offset: Long, - maxSize: Int, + maxSize: Int, fromReplicaId: Int): (MessageSet, Long) = { // check if the current broker is the leader for the partitions val localReplica = if(fromReplicaId == Request.DebuggingConsumerId) @@ -437,7 +437,7 @@ class KafkaApis(val requestChannel: RequestChannel, else replicaManager.getLeaderReplicaIfLocal(topic, partition) trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize)) - val maxOffsetOpt = + val maxOffsetOpt = if (Request.isValidBrokerId(fromReplicaId)) None else @@ -453,7 +453,7 @@ class KafkaApis(val requestChannel: RequestChannel, } /** - * Service the offset request API + * Service the offset request API */ def handleOffsetRequest(request: RequestChannel.Request) { val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest] @@ -476,7 +476,7 @@ class KafkaApis(val requestChannel: RequestChannel, val hw = localReplica.highWatermark if (allOffsets.exists(_ > hw)) hw +: allOffsets.dropWhile(_ > hw) - else + else allOffsets } } @@ -500,19 +500,19 @@ class KafkaApis(val requestChannel: RequestChannel, val response = OffsetResponse(offsetRequest.correlationId, responseMap) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - + def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { logManager.getLog(topicAndPartition) match { - case Some(log) => + case Some(log) => fetchOffsetsBefore(log, timestamp, maxNumOffsets) - case None => + case None => if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime) Seq(0L) else Nil } } - + def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { val segsArray = log.logSegments.toArray var offsetTimeArray: Array[(Long, Long)] = null @@ -554,12 +554,12 @@ class KafkaApis(val requestChannel: RequestChannel, ret.toSeq.sortBy(- _) } - private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = { + private def getTopicMetadata(topics: Set[String], createTopic: Boolean = true): Seq[TopicMetadata] = { val topicResponses = metadataCache.getTopicMetadata(topics) if (topics.size > 0 && topicResponses.size != topics.size) { val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet val responsesForNonExistentTopics = nonExistentTopics.map { topic => - if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) { + if (topic == OffsetManager.OffsetsTopicName || (config.autoCreateTopicsEnable && createTopic)) { try { if (topic == OffsetManager.OffsetsTopicName) { AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, @@ -590,7 +590,7 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleTopicMetadataRequest(request: RequestChannel.Request) { val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest] - val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet) + val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet, metadataRequest.createTopic) val brokers = metadataCache.getAliveBrokers trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId) @@ -843,9 +843,8 @@ class KafkaApis(val requestChannel: RequestChannel, def recordDelayedFetchExpired(forFollower: Boolean) { val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics else aggregateNonFollowerFetchRequestMetrics - + metrics.expiredRequestMeter.mark() } } } - diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index 9c6064e..3e3279c 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -57,7 +57,7 @@ object GetOffsetShell { .describedAs("ms") .ofType(classOf[java.lang.Integer]) .defaultsTo(1000) - + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting consumer offsets.") @@ -73,7 +73,7 @@ object GetOffsetShell { val nOffsets = options.valueOf(nOffsetsOpt).intValue val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() - val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata + val topicsMetadata = ClientUtils.fetchTopicMetadataForNonProducer(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) + "kafka-list-topic.sh to verify") diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index af47836..34099f8 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -92,7 +92,7 @@ object ReplicaVerificationTool extends Logging { .describedAs("ms") .ofType(classOf[java.lang.Long]) .defaultsTo(30 * 1000L) - + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.") @@ -117,7 +117,7 @@ object ReplicaVerificationTool extends Logging { // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) - val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) + val topicsMetadataResponse = ClientUtils.fetchTopicMetadataForNonProducer(Set[String](), metadataTargetBrokers, clientId, maxWaitMs) val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter( topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false)) @@ -395,4 +395,4 @@ private class ReplicaFetcher(name: String, sourceBroker: Broker, topicAndPartiti verificationBarrier.await() debug("Done verification") } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala index 36314f4..519328a 100644 --- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala +++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -93,7 +93,7 @@ object SimpleConsumerShell extends Logging { "skip it instead of halt.") val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend", "If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages") - + if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.") @@ -125,7 +125,7 @@ object SimpleConsumerShell extends Logging { // getting topic metadata info("Getting topic metatdata...") val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt)) - val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata + val topicsMetadata = ClientUtils.fetchTopicMetadataForNonProducer(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) { System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata)) System.exit(1) diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 789e74c..15fd5bc 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -21,31 +21,26 @@ import org.scalatest.junit.JUnit3Suite import org.junit.Test import org.junit.Assert._ -import java.util.Random +import java.util.{Random, Properties} import java.lang.Integer import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException} -import kafka.server.KafkaConfig -import kafka.utils.{TestZKUtils, ShutdownableThread, TestUtils} -import kafka.integration.KafkaServerTestHarness +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{ShutdownableThread, Utils, TestUtils} +import kafka.zk.ZooKeeperTestHarness import kafka.consumer.SimpleConsumer import org.apache.kafka.common.KafkaException import org.apache.kafka.clients.producer._ -class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarness { - private val producerBufferSize = 30000 - private val serverMessageMaxBytes = producerBufferSize/2 - - val numServers = 2 - val configs = - for(props <- TestUtils.createBrokerConfigs(numServers, false)) - yield new KafkaConfig(props) { - override val zkConnect = TestZKUtils.zookeeperConnect - override val autoCreateTopicsEnable = false - override val messageMaxBytes = serverMessageMaxBytes - } - +class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness { + private val brokerId1 = 0 + private val brokerId2 = 1 + private val ports = TestUtils.choosePorts(2) + private val (port1, port2) = (ports(0), ports(1)) + private var server1: KafkaServer = null + private var server2: KafkaServer = null + private var servers = List.empty[KafkaServer] private var consumer1: SimpleConsumer = null private var consumer2: SimpleConsumer = null @@ -55,19 +50,32 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes private var producer3: KafkaProducer = null private var producer4: KafkaProducer = null + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false) + props1.put("auto.create.topics.enable", "false") + props2.put("auto.create.topics.enable", "false") + private val config1 = new KafkaConfig(props1) + private val config2 = new KafkaConfig(props2) + private val brokerList = TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)) + + private val bufferSize = 2 * config1.messageMaxBytes + private val topic1 = "topic-1" private val topic2 = "topic-2" override def setUp() { super.setUp() + server1 = TestUtils.createServer(config1) + server2 = TestUtils.createServer(config2) + servers = List(server1,server2) // TODO: we need to migrate to new consumers when 0.9 is final - consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "") - consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "") + consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "") - producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = producerBufferSize); - producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize) - producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = producerBufferSize) + producer1 = TestUtils.createNewProducer(brokerList, acks = 0, blockOnBufferFull = false, bufferSize = bufferSize); + producer2 = TestUtils.createNewProducer(brokerList, acks = 1, blockOnBufferFull = false, bufferSize = bufferSize) + producer3 = TestUtils.createNewProducer(brokerList, acks = -1, blockOnBufferFull = false, bufferSize = bufferSize) } override def tearDown() { @@ -79,6 +87,9 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes if (producer3 != null) producer3.close if (producer4 != null) producer4.close + server1.shutdown; Utils.rm(server1.config.logDirs) + server2.shutdown; Utils.rm(server2.config.logDirs) + super.tearDown() } @@ -91,7 +102,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // send a too-large record - val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) + val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1)) assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset, -1L) } @@ -104,7 +115,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // send a too-large record - val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes + 1)) + val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](config1.messageMaxBytes + 1)) intercept[ExecutionException] { producer2.send(record).get } @@ -138,7 +149,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes TestUtils.createTopic(zkClient, topic1, 1, 2, servers) // producer with incorrect broker list - producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = producerBufferSize) + producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull = false, bufferSize = bufferSize) // send a record with incorrect broker list val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes) @@ -164,7 +175,8 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes // stop IO threads and request handling, but leave networking operational // any requests should be accepted and queue up, but not handled - servers.foreach(server => server.requestHandlerPool.shutdown()) + server1.requestHandlerPool.shutdown() + server2.requestHandlerPool.shutdown() producer1.send(record1).get(5000, TimeUnit.MILLISECONDS) @@ -174,11 +186,11 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes // TODO: expose producer configs after creating them // send enough messages to get buffer full - val tooManyRecords = 10 - val msgSize = producerBufferSize / tooManyRecords + val msgSize = 10000 val value = new Array[Byte](msgSize) new Random().nextBytes(value) val record2 = new ProducerRecord(topic1, null, "key".getBytes, value) + val tooManyRecords = bufferSize / ("key".getBytes.length + value.length) intercept[KafkaException] { for (i <- 1 to tooManyRecords) @@ -257,13 +269,17 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes // rolling bounce brokers for (i <- 0 until 2) { - for (server <- servers) { - server.shutdown() - server.awaitShutdown() - server.startup + server1.shutdown() + server1.awaitShutdown() + server1.startup - Thread.sleep(2000) - } + Thread.sleep(2000) + + server2.shutdown() + server2.awaitShutdown() + server2.startup + + Thread.sleep(2000) // Make sure the producer do not see any exception // in returned metadata due to broker failures @@ -282,7 +298,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes // double check that the leader info has been propagated after consecutive bounces val leader = TestUtils.waitUntilMetadataIsPropagated(servers, topic1, partition) - val fetchResponse = if(leader == configs(0).brokerId) { + val fetchResponse = if(leader == server1.config.brokerId) { consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) } else { consumer2.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) @@ -301,7 +317,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with KafkaServerTestHarnes var sent = 0 var failed = false - val producer = TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10) + val producer = TestUtils.createNewProducer(brokerList, bufferSize = bufferSize, retries = 10) override def doWork(): Unit = { val responses = diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala index d407af9..34a7db4 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala @@ -17,6 +17,7 @@ package kafka.api.test +import java.util.Properties import java.lang.{Integer, IllegalArgumentException} import org.apache.kafka.clients.producer._ @@ -24,41 +25,53 @@ import org.scalatest.junit.JUnit3Suite import org.junit.Test import org.junit.Assert._ -import kafka.server.KafkaConfig -import kafka.utils.{TestZKUtils, TestUtils} +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{Utils, TestUtils} +import kafka.zk.ZooKeeperTestHarness import kafka.consumer.SimpleConsumer import kafka.api.FetchRequestBuilder import kafka.message.Message -import kafka.integration.KafkaServerTestHarness -class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { - val numServers = 2 - val configs = - for(props <- TestUtils.createBrokerConfigs(numServers, false)) - yield new KafkaConfig(props) { - override val zkConnect = TestZKUtils.zookeeperConnect - override val numPartitions = 4 - } +class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness { + private val brokerId1 = 0 + private val brokerId2 = 1 + private val ports = TestUtils.choosePorts(2) + private val (port1, port2) = (ports(0), ports(1)) + private var server1: KafkaServer = null + private var server2: KafkaServer = null + private var servers = List.empty[KafkaServer] private var consumer1: SimpleConsumer = null private var consumer2: SimpleConsumer = null + private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false) + private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false) + props1.put("num.partitions", "4") + props2.put("num.partitions", "4") + private val config1 = new KafkaConfig(props1) + private val config2 = new KafkaConfig(props2) + private val topic = "topic" private val numRecords = 100 override def setUp() { super.setUp() + // set up 2 brokers with 4 partitions each + server1 = TestUtils.createServer(config1) + server2 = TestUtils.createServer(config2) + servers = List(server1,server2) // TODO: we need to migrate to new consumers when 0.9 is final - consumer1 = new SimpleConsumer("localhost", configs(0).port, 100, 1024*1024, "") - consumer2 = new SimpleConsumer("localhost", configs(1).port, 100, 1024*1024, "") + consumer1 = new SimpleConsumer("localhost", port1, 100, 1024*1024, "") + consumer2 = new SimpleConsumer("localhost", port2, 100, 1024*1024, "") } override def tearDown() { - consumer1.close() - consumer2.close() - + server1.shutdown + server2.shutdown + Utils.rm(server1.config.logDirs) + Utils.rm(server2.config.logDirs) super.tearDown() } @@ -77,7 +90,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { */ @Test def testSendOffset() { - var producer = TestUtils.createNewProducer(brokerList) + var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) val callback = new CheckErrorCallback @@ -133,7 +146,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { */ @Test def testClose() { - var producer = TestUtils.createNewProducer(brokerList) + var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) try { // create topic @@ -169,7 +182,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { */ @Test def testSendToPartition() { - var producer = TestUtils.createNewProducer(brokerList) + var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) try { // create topic @@ -196,7 +209,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { } // make sure the fetched messages also respect the partitioning and ordering - val fetchResponse1 = if(leader1.get == configs(0).brokerId) { + val fetchResponse1 = if(leader1.get == server1.config.brokerId) { consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) } else { consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) @@ -224,7 +237,8 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness { */ @Test def testAutoCreateTopic() { - var producer = TestUtils.createNewProducer(brokerList, retries = 5) + var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)), + retries = 5) try { // Send a message to auto-create the topic diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 1bf2667..e3cfd8c 100644 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -109,7 +109,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { // read metadata from a broker and verify the new topic partitions exist TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1) TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions", + val metadata = ClientUtils.fetchTopicMetadataForNonProducer(Set(topic1), brokers, "AddPartitionsTest-testIncrementPartitions", 2000,0).topicsMetadata val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1)) val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata @@ -134,7 +134,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { // read metadata from a broker and verify the new topic partitions exist TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1) TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas", + val metadata = ClientUtils.fetchTopicMetadataForNonProducer(Set(topic2), brokers, "AddPartitionsTest-testManualAssignmentOfReplicas", 2000,0).topicsMetadata val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2)) val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata @@ -158,7 +158,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5) TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement", + val metadata = ClientUtils.fetchTopicMetadataForNonProducer(Set(topic3), brokers, "AddPartitionsTest-testReplicaPlacement", 2000,0).topicsMetadata val metaDataForTopic3 = metadata.filter(p => p.topic.equals(topic3)).head diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 3cf7c9b..194dd70 100644 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -30,13 +30,11 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness { val configs: List[KafkaConfig] var servers: List[KafkaServer] = null - var brokerList: String = null override def setUp() { super.setUp if(configs.size <= 0) throw new KafkaException("Must suply at least one server config.") - brokerList = TestUtils.getBrokerListStrFromConfigs(configs) servers = configs.map(TestUtils.createServer(_)) } diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 35dc071..bbf5d0d 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -67,7 +67,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val topic = "test" createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) - var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", + var topicsMetadata = ClientUtils.fetchTopicMetadataForNonProducer(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", 2000,0).topicsMetadata assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode) @@ -87,7 +87,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) // issue metadata request with empty list of topics - var topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokers, "TopicMetadataTest-testGetAllTopicMetadata", + var topicsMetadata = ClientUtils.fetchTopicMetadataForNonProducer(Set.empty, brokers, "TopicMetadataTest-testGetAllTopicMetadata", 2000, 0).topicsMetadata assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) assertEquals(2, topicsMetadata.size) @@ -106,8 +106,8 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { def testAutoCreateTopic { // auto create topic val topic = "testAutoCreateTopic" - var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testAutoCreateTopic", - 2000,0).topicsMetadata + var topicsMetadata = ClientUtils.fetchTopicMetadataForNonProducer(Set(topic),brokers,"TopicMetadataTest-testAutoCreateTopic", + 2000,0,true).topicsMetadata assertEquals(ErrorMapping.LeaderNotAvailableCode, topicsMetadata.head.errorCode) assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size) assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic) @@ -118,7 +118,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0) // retry the metadata for the auto created topic - topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", + topicsMetadata = ClientUtils.fetchTopicMetadataForNonProducer(Set(topic),brokers,"TopicMetadataTest-testBasicTopicMetadata", 2000,0).topicsMetadata assertEquals(ErrorMapping.NoError, topicsMetadata.head.errorCode) assertEquals(ErrorMapping.NoError, topicsMetadata.head.partitionsMetadata.head.errorCode) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 4d01d25..3faa884 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -385,7 +385,7 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString) producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) - producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") + producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000") producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") return new KafkaProducer(producerProps) }