diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 855ae84..7a9445b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -15,15 +15,7 @@ package org.apache.kafka.clients.producer.internals; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Deque; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; +import java.util.*; import java.util.concurrent.TimeUnit; import org.apache.kafka.common.Cluster; @@ -97,6 +89,9 @@ public class Sender implements Runnable { /* the set of currently in-flight requests awaiting a response from the server */ private final InFlightRequests inFlightRequests; + /* a reference to the bootstrap Cluster instance */ + private final Cluster bootstrapCluster; + /* a reference to the current Cluster instance */ private final Metadata metadata; @@ -136,6 +131,7 @@ public class Sender implements Runnable { this.selector = selector; this.maxRequestSize = maxRequestSize; this.metadata = metadata; + this.bootstrapCluster = metadata.fetch(); this.clientId = clientId; this.running = true; this.requestTimeout = requestTimeout; @@ -240,7 +236,7 @@ public class Sender implements Runnable { if (this.metadataFetchInProgress || !metadata.needsUpdate(now)) return; - Node node = selectMetadataDestination(cluster); + Node node = selectMetadataDestination(cluster, now); if (node == null) return; @@ -258,31 +254,49 @@ public class Sender implements Runnable { } /** - * Find a good node to make a metadata request to. This method will first look for a node that has an existing + * Find a good node to make a metadata request to. It picks a node in the following order: + * 1. a random connected node with no outstanding requests + * 2. a random connect-able node + * 3. a random connected bootstrap node + * 4. a random connect-able bootstrap node + * 5. a random node that can send new requests + * + * This method will first look for a node that has an existing * connection and no outstanding requests. If there are no such nodes it will look for a node with no outstanding * requests. * @return A node with no requests currently being sent or null if no such node exists */ - private Node selectMetadataDestination(Cluster cluster) { - List nodes = cluster.nodes(); - - // first look for a node to which we are connected and have no outstanding requests - boolean connectionInProgress = false; - for (int i = 0; i < nodes.size(); i++) { - Node node = nodes.get(metadataNodeIndex(i, nodes.size())); - if (nodeStates.isConnected(node.id()) && this.inFlightRequests.canSendMore(node.id())) { - this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size()); - return node; - } else if (nodeStates.isConnecting(node.id())) { - connectionInProgress = true; + private Node selectMetadataDestination(Cluster cluster, long now) { + List clusterList = Arrays.asList(cluster, this.bootstrapCluster); + + for (Cluster curCluster : clusterList) { + List nodes = curCluster.nodes(); + + // first look for a node to which we are connected and have no outstanding requests + boolean connectionInProgress = false; + Node connectableNode = null; + for (int i = 0; i < nodes.size(); i++) { + Node node = nodes.get(metadataNodeIndex(i, nodes.size())); + if (nodeStates.isConnected(node.id()) && !this.inFlightRequests.hasOutstandingRequests(node.id())) { + this.metadataFetchNodeIndex = metadataNodeIndex(i + 1, nodes.size()); + return node; + } else if (nodeStates.isConnecting(node.id())) { + connectionInProgress = true; + } else if (connectableNode == null && nodeStates.canConnect(node.id(), now)) { + connectableNode = node; + } } - } - // if we have a connection that is being established now, just wait for that don't make another - if (connectionInProgress) - return null; + // if we have a connection that is being established now, just wait for that don't make another + if (connectionInProgress) + return null; + + if (connectableNode != null) + return connectableNode; + } // okay, no luck, pick a random unused node + List nodes = cluster.nodes(); for (int i = 0; i < nodes.size(); i++) { Node node = nodes.get(metadataNodeIndex(i, nodes.size())); if (this.inFlightRequests.canSendMore(node.id())) { @@ -736,6 +750,17 @@ public class Sender implements Runnable { } /** + * Are there outstanding requests on this node? + * + * @param node Node in question + * @return true iff there are no outstanding requests on the given node + */ + public boolean hasOutstandingRequests(int node) { + Deque queue = requests.get(node); + return queue == null || queue.isEmpty(); + } + + /** * Clear out all the in-flight requests for the given node and return them * * @param node The node