diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index e373265..541c5e1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -324,6 +324,7 @@ public class Sender implements Runnable { private void handleDisconnects(List disconnects, long now) { // clear out the in-flight requests for the disconnected broker for (int node : disconnects) { + nodeStates.disconnected(node); for (InFlightRequest request : this.inFlightRequests.clearAll(node)) { if (request.batches != null) { for (RecordBatch batch : request.batches.values()) { @@ -335,7 +336,6 @@ public class Sender implements Runnable { } } } - nodeStates.disconnected(request.request.destination()); } } // we got a disconnect so we should probably refresh our metadata and see if that broker is dead diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index f1e474c..633bdbf 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -306,6 +306,7 @@ public class Selector implements Selectable { key.cancel(); channel.socket().close(); channel.close(); + this.keys.remove(trans.id); } /**