diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ index 4b3ef68..0be8b3e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -57,6 +57,7 @@ public class ConsumerNetworkClient implements Closeable { // flag and the request completion queue below). private final Logger log; private final KafkaClient client; + private boolean disconnecting = false; private final UnsentRequests unsent = new UnsentRequests(); private final Metadata metadata; private final Time time; @@ -83,6 +84,7 @@ public class ConsumerNetworkClient implements Closeable { this.log = logContext.logger(ConsumerNetworkClient.class); this.client = client; this.metadata = metadata; + this.disconnecting = false; this.time = time; this.retryBackoffMs = retryBackoffMs; this.maxPollTimeoutMs = Math.min(maxPollTimeoutMs, MAX_POLL_TIMEOUT_MS); @@ -388,6 +390,8 @@ public class ConsumerNetworkClient implements Closeable { public void disconnect(Node node) { synchronized (this) { + if (disconnecting) return; + disconnecting = true; failUnsentRequests(node, DisconnectException.INSTANCE); client.disconnect(node.idString()); } @@ -482,6 +486,7 @@ public class ConsumerNetworkClient implements Closeable { public void tryConnect(Node node) { synchronized (this) { client.ready(node, time.milliseconds()); + disconnecting = false; } }