diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 7e32509..7025b57 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -320,6 +320,10 @@ public class Selector implements Selectable, AutoCloseable { } sensors.close(); channelBuilder.close(); + for (Map.Entry entry : this.closingChannels.entrySet()) { + doClose(entry.getValue(), false); + } + this.closingChannels.clear(); } /** @@ -804,6 +808,13 @@ public class Selector implements Selectable, AutoCloseable { } /** + * @return the number of Channels in closingChannels Map + */ + int countOfClosingChannel() { + return closingChannels.size(); + } + + /** * Get the channel associated with selectionKey */ private KafkaChannel channel(SelectionKey key) { diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index 3bdb07a..1ffb439 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -76,6 +76,7 @@ public class SslSelectorTest extends SelectorTest { @After public void tearDown() throws Exception { this.selector.close(); + assertEquals(0, selector.countOfClosingChannel()); this.server.close(); this.metrics.close(); }