diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index e8c194c..87dd1a6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -126,17 +126,17 @@ public class Sender implements Runnable { // get the list of partitions with data ready to send List ready = this.accumulator.ready(now); - // prune the list of ready topics to eliminate any that we aren't ready to send yet - List sendable = processReadyPartitions(cluster, ready, now); - // should we update our metadata? - List sends = new ArrayList(sendable.size()); + List sends = new ArrayList(); InFlightRequest metadataReq = maybeMetadataRequest(cluster, now); if (metadataReq != null) { sends.add(metadataReq.request); this.inFlightRequests.add(metadataReq); } + // prune the list of ready topics to eliminate any that we aren't ready to send yet + List sendable = processReadyPartitions(cluster, ready, now); + // create produce requests List batches = this.accumulator.drain(sendable, this.maxRequestSize); List requests = collate(cluster, batches); @@ -165,7 +165,11 @@ public class Sender implements Runnable { private InFlightRequest maybeMetadataRequest(Cluster cluster, long now) { if (this.metadataFetchInProgress || !metadata.needsUpdate(now)) return null; - Node node = cluster.nextNode(); + + Node node = nextFreeNode(cluster); + if (node == null) + return null; + NodeState state = nodeState.get(node.id()); if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) { // we don't have a connection to this node right now, make one @@ -180,6 +184,18 @@ public class Sender implements Runnable { } /** + * @return A node with no requests currently being sent or null if no such node exists + */ + private Node nextFreeNode(Cluster cluster) { + for (int i = 0; i < cluster.nodes().size(); i++) { + Node node = cluster.nextNode(); + if (this.inFlightRequests.canSendMore(node.id())) + return node; + } + return null; + } + + /** * Start closing the sender (won't actually complete until all data is sent out) */ public void initiateClose() { diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties index d7d03ea..df572f2 100644 --- a/core/src/test/resources/log4j.properties +++ b/core/src/test/resources/log4j.properties @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=OFF, stdout +log4j.rootLogger=DEBUG, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout