From c35271c8fd9a6d55459b1bea2b40cee1e33ced3c Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Tue, 23 Jun 2015 18:45:45 -0700 Subject: [PATCH] KAFKA-2298; Client Selector can drop connections on InvalidReceiveException without notifying NetworkClient --- .../org/apache/kafka/common/network/Selector.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 4aee214..6fec7f1 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -301,10 +301,8 @@ public class Selector implements Selectable { } /* cancel any defunct sockets */ - if (!key.isValid()) { + if (!key.isValid()) close(transmissions.id); - this.disconnected.add(transmissions.id); - } } catch (IOException e) { String desc = socketDescription(channel); if (e instanceof EOFException || e instanceof ConnectException) @@ -312,7 +310,6 @@ public class Selector implements Selectable { else log.warn("Error in I/O with connection to {}", desc, e); close(transmissions.id); - this.disconnected.add(transmissions.id); } } } @@ -395,7 +392,6 @@ public class Selector implements Selectable { log.trace("About to close the idle connection from " + connectionId + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis"); - disconnected.add(connectionId); close(connectionId); } } @@ -436,6 +432,7 @@ public class Selector implements Selectable { public void close(String id) { SelectionKey key = keyForId(id); lruConnections.remove(id); + disconnected.add(id); SocketChannel channel = channel(key); Transmissions trans = transmissions(key); if (trans != null) { -- 1.7.9.5