diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 5bc25de..662b295 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -19,6 +19,8 @@ package org.apache.kafka.clients; import java.util.concurrent.ThreadLocalRandom; import org.apache.kafka.common.errors.AuthenticationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; @@ -28,6 +30,7 @@ import java.util.Map; * */ final class ClusterConnectionStates { + private static final Logger log = LoggerFactory.getLogger(ClusterConnectionStates.class); private final long reconnectBackoffInitMs; private final long reconnectBackoffMaxMs; private final static int RECONNECT_BACKOFF_EXP_BASE = 2; @@ -106,7 +109,16 @@ final class ClusterConnectionStates { * @param now the current time */ public void connecting(String id, long now) { - nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, this.reconnectBackoffInitMs)); + long failedAttempts = 0; + if (nodeState.containsKey(id)) { + if (nodeState.get(id).state != ConnectionState.CONNECTING) { + log.info("New connection for " + id + " but current state is " + + nodeState.get(id).state); + failedAttempts = nodeState.get(id).failedAttempts; + } + } + nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now, + this.reconnectBackoffInitMs, failedAttempts)); } /** @@ -265,11 +277,12 @@ final class ClusterConnectionStates { long failedAttempts; long reconnectBackoffMs; - public NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs) { + public NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs, + long failedAttempts) { this.state = state; this.authenticationException = null; this.lastConnectAttemptMs = lastConnectAttempt; - this.failedAttempts = 0; + this.failedAttempts = failedAttempts; this.reconnectBackoffMs = reconnectBackoffMs; }