diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 9839632..40cfeec 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -100,6 +100,9 @@ public class Selector implements Selectable { */ @Override public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { + if (this.keys.containsKey(id)) + throw new IllegalStateException("There is already a connection for id " + id); + SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); Socket socket = channel.socket(); @@ -118,8 +121,6 @@ public class Selector implements Selectable { } SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT); key.attach(new Transmissions(id)); - if (this.keys.containsKey(key)) - throw new IllegalStateException("There is already a connection for id " + id); this.keys.put(id, key); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 90e2dcf..d02bdd7 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -213,6 +213,12 @@ public class SelectorTest { assertEquals("", blockingRequest(node, "")); } + @Test(expected = IllegalStateException.class) + public void testExistingConnectionId() throws IOException { + blockingConnect(0); + blockingConnect(0); + } + private String blockingRequest(int node, String s) throws IOException { selector.poll(1000L, asList(createSend(node, s))); while (true) { diff --git a/gradle.properties b/gradle.properties index 4827769..236e243 100644 --- a/gradle.properties +++ b/gradle.properties @@ -15,7 +15,7 @@ group=org.apache.kafka version=0.8.1 -scalaVersion=2.8.0 +scalaVersion=2.9.2 task=build mavenUrl=