diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index e8c194c..1927ec9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -130,7 +130,7 @@ public class Sender implements Runnable { List sendable = processReadyPartitions(cluster, ready, now); // should we update our metadata? - List sends = new ArrayList(sendable.size()); + List sends = new ArrayList(); InFlightRequest metadataReq = maybeMetadataRequest(cluster, now); if (metadataReq != null) { sends.add(metadataReq.request); @@ -165,7 +165,11 @@ public class Sender implements Runnable { private InFlightRequest maybeMetadataRequest(Cluster cluster, long now) { if (this.metadataFetchInProgress || !metadata.needsUpdate(now)) return null; - Node node = cluster.nextNode(); + + Node node = nextFreeNode(cluster); + if (node == null) + return null; + NodeState state = nodeState.get(node.id()); if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) { // we don't have a connection to this node right now, make one @@ -180,6 +184,18 @@ public class Sender implements Runnable { } /** + * @return A node with no requests currently being sent or null if no such node exists + */ + private Node nextFreeNode(Cluster cluster) { + for (int i = 0; i < cluster.nodes().size(); i++) { + Node node = cluster.nextNode(); + if (this.inFlightRequests.canSendMore(node.id())) + return node; + } + return null; + } + + /** * Start closing the sender (won't actually complete until all data is sent out) */ public void initiateClose() {