From e7014caead608c7b8fd42c39980903dce4a644c0 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 11 Jun 2015 17:59:24 -0700 Subject: [PATCH 1/2] KAFKA-2266; add dropped idle connections to the list of disconnects in Selector --- .../org/apache/kafka/common/network/Selector.java | 2 ++ .../apache/kafka/common/network/SelectorTest.java | 20 +++++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index effb1e6..1da215b 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -393,6 +393,8 @@ public class Selector implements Selectable { if (log.isTraceEnabled()) log.trace("About to close the idle connection from " + connectionId + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis"); + + disconnected.add(connectionId); close(connectionId); } } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index d23b4b6..7c53b5f 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -26,6 +26,7 @@ import java.util.*; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -40,13 +41,15 @@ public class SelectorTest { private static final int BUFFER_SIZE = 4 * 1024; private EchoServer server; + private Time time; private Selectable selector; @Before public void setup() throws Exception { this.server = new EchoServer(); this.server.start(); - this.selector = new Selector(5000, new Metrics(), new MockTime() , "MetricGroup", new LinkedHashMap()); + this.time = new MockTime(); + this.selector = new Selector(5000, new Metrics(), time, "MetricGroup", new LinkedHashMap()); } @After @@ -244,6 +247,21 @@ public class SelectorTest { assertEquals("The response should be from the previously muted node", "1", selector.completedReceives().get(0).source()); } + + @Test + public void testCloseOldestConnection() throws Exception { + String id = "0"; + + InetSocketAddress addr = new InetSocketAddress("localhost", server.port); + selector.connect(id, addr, BUFFER_SIZE, BUFFER_SIZE); + selector.poll(1000); // Finish the connect + + time.sleep(6000); // The max idle time is 5000ms + selector.poll(0); + + assertTrue("The idle connection should have been closed", selector.disconnected().contains(id)); + } + private String blockingRequest(String node, String s) throws IOException { selector.send(createSend(node, s)); selector.poll(1000L); -- 2.3.2 (Apple Git-55) From 225a72f4a00641707bdf6adb3cd07d8aa81f4153 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 12 Jun 2015 09:16:37 -0700 Subject: [PATCH 2/2] KAFKA-2266; use blockingConnect function in idle disconnect test case --- .../src/test/java/org/apache/kafka/common/network/SelectorTest.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 7c53b5f..158f982 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -251,10 +251,7 @@ public class SelectorTest { @Test public void testCloseOldestConnection() throws Exception { String id = "0"; - - InetSocketAddress addr = new InetSocketAddress("localhost", server.port); - selector.connect(id, addr, BUFFER_SIZE, BUFFER_SIZE); - selector.poll(1000); // Finish the connect + blockingConnect(id); time.sleep(6000); // The max idle time is 5000ms selector.poll(0); -- 2.3.2 (Apple Git-55)