diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index f739279..9528289 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -122,16 +122,17 @@ public class NetworkClient implements KafkaClient { */ @Override public boolean isReady(Node node, long now) { - return isReady(node.id(), now); - } - - private boolean isReady(int node, long now) { + int nodeId = node.id(); if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0) // if we need to update our metadata now declare all requests unready to make metadata requests first priority return false; else // otherwise we are ready if we are connected and can send more requests - return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node); + return isReadyToSend(nodeId); + } + + private boolean isReadyToSend(int node) { + return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node); } /** @@ -146,21 +147,21 @@ public class NetworkClient implements KafkaClient { public List poll(List requests, long timeout, long now) { List sends = new ArrayList(); - // should we update our metadata? - long metadataTimeout = metadata.timeToNextUpdate(now); - if (!this.metadataFetchInProgress && metadataTimeout == 0) - maybeUpdateMetadata(sends, now); - for (int i = 0; i < requests.size(); i++) { ClientRequest request = requests.get(i); int nodeId = request.request().destination(); - if (!isReady(nodeId, now)) + if (!isReadyToSend(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); this.inFlightRequests.add(request); sends.add(request.request()); } + // should we update our metadata? + long metadataTimeout = metadata.timeToNextUpdate(now); + if (!this.metadataFetchInProgress && metadataTimeout == 0) + maybeUpdateMetadata(sends, now); + // do the I/O try { this.selector.poll(Math.min(timeout, metadataTimeout), sends); @@ -347,9 +348,12 @@ public class NetworkClient implements KafkaClient { */ private void maybeUpdateMetadata(List sends, long now) { Node node = this.leastLoadedNode(now); - if (node == null) + if (node == null) { + log.debug("Give up sending metadata request since no node is available"); return; + } + log.debug("Trying to send metadata request to node {}", node.id()); if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { Set topics = metadata.topics(); this.metadataFetchInProgress = true; @@ -359,6 +363,7 @@ public class NetworkClient implements KafkaClient { this.inFlightRequests.add(metadataRequest); } else if (connectionStates.canConnect(node.id(), now)) { // we don't have a connection to this node right now, make one + log.debug("Give up sending metadata request to node {} since it is either not connected or cannot have more in flight requests", node.id()); initiateConnect(node, now); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index 140237f..4aa5b01 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -130,7 +130,7 @@ public final class Metadata { this.version += 1; this.cluster = cluster; notifyAll(); - log.debug("Updated cluster metadata to {}", cluster); + log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); } /** diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 4f06e34..555d751 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -116,19 +116,6 @@ object MirrorMaker extends Logging { val numStreams = options.valueOf(numStreamsOpt).intValue() val bufferSize = options.valueOf(bufferSizeOpt).intValue() - val useNewProducer = options.has(useNewProducerOpt) - val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) - - // create producer threads - val clientId = producerProps.getProperty("client.id", "") - val producers = (1 to numProducers).map(i => { - producerProps.setProperty("client.id", clientId + "-" + i) - if (useNewProducer) - new NewShinyProducer(producerProps) - else - new OldProducer(producerProps) - }) - // create consumer streams connectors = options.valuesOf(consumerConfigOpt).toList .map(cfg => new ConsumerConfig(Utils.loadProps(cfg))) @@ -138,8 +125,21 @@ object MirrorMaker extends Logging { // create a data channel btw the consumers and the producers val mirrorDataChannel = new DataChannel(bufferSize, numConsumers, numProducers) - producerThreads = producers.zipWithIndex.map(producerAndIndex => new ProducerThread(mirrorDataChannel, producerAndIndex._1, producerAndIndex._2)) + // create producer threads + val useNewProducer = options.has(useNewProducerOpt) + val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) + val clientId = producerProps.getProperty("client.id", "") + producerThreads = (0 until numProducers).map(i => { + producerProps.setProperty("client.id", clientId + "-" + i) + val producer = + if (useNewProducer) + new NewShinyProducer(producerProps) + else + new OldProducer(producerProps) + new ProducerThread(mirrorDataChannel, producer, i) + }) + // create consumer threads val filterSpec = if (options.has(whitelistOpt)) new Whitelist(options.valueOf(whitelistOpt)) else @@ -153,7 +153,7 @@ object MirrorMaker extends Logging { fatal("Unable to create stream - shutting down mirror maker.") connectors.foreach(_.shutdown) } - consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, producers, streamAndIndex._2)) + consumerThreads = streams.zipWithIndex.map(streamAndIndex => new ConsumerThread(streamAndIndex._1, mirrorDataChannel, streamAndIndex._2)) assert(consumerThreads.size == numConsumers) Runtime.getRuntime.addShutdownHook(new Thread() { @@ -233,7 +233,6 @@ object MirrorMaker extends Logging { class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]], mirrorDataChannel: DataChannel, - producers: Seq[BaseProducer], threadId: Int) extends Thread with Logging with KafkaMetricsGroup {