diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 5bc25de..d28df05 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -19,6 +19,8 @@ package org.apache.kafka.clients; import java.util.concurrent.ThreadLocalRandom; import org.apache.kafka.common.errors.AuthenticationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; @@ -28,6 +30,7 @@ import java.util.Map; * */ final class ClusterConnectionStates { + private static final Logger log = LoggerFactory.getLogger(ClusterConnectionStates.class); private final long reconnectBackoffInitMs; private final long reconnectBackoffMaxMs; private final static int RECONNECT_BACKOFF_EXP_BASE = 2; @@ -106,7 +109,14 @@ final class ClusterConnectionStates { * @param now the current time */ public void connecting(String id, long now) { - nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, this.reconnectBackoffInitMs)); + if (nodeState.containsKey(id)) { + NodeConnectionState node = nodeState.get(id); + node.lastConnectAttemptMs = now; + node.state = ConnectionState.CONNECTING; + } else { + nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, + this.reconnectBackoffInitMs)); + } } /** @@ -274,7 +284,7 @@ final class ClusterConnectionStates { } public String toString() { - return "NodeState(" + state + ", " + lastConnectAttemptMs + ")"; + return "NodeState(" + state + ", " + lastConnectAttemptMs + ", " + failedAttempts + ")"; } } }