diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 7e32509..9753885 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -311,8 +311,10 @@ public class Selector implements Selectable, AutoCloseable { @Override public void close() { List connections = new ArrayList<>(channels.keySet()); - for (String id : connections) + for (String id : connections) { close(id); + log.error("close " + channels.get(id)); + } try { this.nioSelector.close(); } catch (IOException | SecurityException e) { @@ -320,6 +322,11 @@ public class Selector implements Selectable, AutoCloseable { } sensors.close(); channelBuilder.close(); + for (Map.Entry entry : this.closingChannels.entrySet()) { + log.error("closing " + entry.getValue()); + doClose(entry.getValue(), false); + } + this.closingChannels.clear(); } /** @@ -664,7 +671,10 @@ public class Selector implements Selectable, AutoCloseable { boolean sendFailed = failedSends.remove(channel.id()); if (deque == null || deque.isEmpty() || sendFailed) { doClose(channel, true); + log.error("channel " + channel + " for " + channel.id() + " closed and removed"); it.remove(); + } else { + log.error("channel " + channel + " for " + channel.id() + " removed from failedSends but not closed"); } } for (String channel : this.failedSends) @@ -804,6 +814,17 @@ public class Selector implements Selectable, AutoCloseable { } /** + * @return the number of Channels in closingChannels Map + */ + int countOfClosingChannel() { + return closingChannels.size(); + } + + int countOfMutedChannels() { + return explicitlyMutedChannels.size(); + } + + /** * Get the channel associated with selectionKey */ private KafkaChannel channel(SelectionKey key) { diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index cfd7fb3..440cee2 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -95,6 +95,9 @@ public class SelectorTest { verifySelectorEmpty(); } finally { this.selector.close(); + + assertEquals(0, this.selector.countOfClosingChannel()); + assertEquals(0, this.selector.countOfMutedChannels()); this.server.close(); this.metrics.close(); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index 3bdb07a..cdd9bda 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -76,6 +76,8 @@ public class SslSelectorTest extends SelectorTest { @After public void tearDown() throws Exception { this.selector.close(); + assertEquals(0, selector.countOfClosingChannel()); + assertEquals(0, this.selector.countOfMutedChannels()); this.server.close(); this.metrics.close(); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index d70a448..4fd1d52 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -87,8 +87,11 @@ public class SslTransportLayerTest { @After public void teardown() throws Exception { - if (selector != null) + if (selector != null) { this.selector.close(); + assertEquals(0, this.selector.countOfClosingChannel()); + assertEquals(0, this.selector.countOfMutedChannels()); + } if (server != null) this.server.close(); }