Details
Description
`NetworkClient.close(node)` closes the node and removes it from `ClusterConnectionStates.nodeState`, but not from `ClusterConnectionStates.connectingNodes`. Subsequent `NetworkClient.poll()` invocations throw IllegalStateException and this leaves the NetworkClient in an unusable state until the node is removed from connectionNodes or added to nodeState. We don't use `NetworkClient.close(node)` in clients, but we use it in clients started by brokers for replica fetcher and controller. Since brokers use NetworkClientUtils.isReady() before establishing connections and this invokes poll(), the NetworkClient never recovers.
Exception stack trace:
java.lang.IllegalStateException: No entry found for connection 0
at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:409)
at org.apache.kafka.clients.ClusterConnectionStates.isConnectionSetupTimeout(ClusterConnectionStates.java:446)
at org.apache.kafka.clients.ClusterConnectionStates.lambda$nodesWithConnectionSetupTimeout$0(ClusterConnectionStates.java:458)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174)
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.kafka.clients.ClusterConnectionStates.nodesWithConnectionSetupTimeout(ClusterConnectionStates.java:459)
at org.apache.kafka.clients.NetworkClient.handleTimedOutConnections(NetworkClient.java:807)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
at org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:42)
Attachments
Issue Links
- links to