diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 5d93965..5b801e4 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -114,6 +114,9 @@ public class Selector implements Selectable { */ @Override public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { + if (this.keys.containsKey(id)) + throw new IllegalStateException("There is already a connection for id " + id); + SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); Socket socket = channel.socket(); @@ -132,8 +135,6 @@ public class Selector implements Selectable { } SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT); key.attach(new Transmissions(id)); - if (this.keys.containsKey(key)) - throw new IllegalStateException("There is already a connection for id " + id); this.keys.put(id, key); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 99856e9..5c5e3d4 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -205,6 +205,12 @@ public class SelectorTest { assertEquals("", blockingRequest(node, "")); } + @Test(expected = IllegalStateException.class) + public void testExistingConnectionId() throws IOException { + blockingConnect(0); + blockingConnect(0); + } + private String blockingRequest(int node, String s) throws IOException { selector.poll(1000L, asList(createSend(node, s))); while (true) {