diff --git core/src/main/scala/kafka/network/BlockingChannel.scala core/src/main/scala/kafka/network/BlockingChannel.scala index 6e2a38e..0bf915e 100644 --- core/src/main/scala/kafka/network/BlockingChannel.scala +++ core/src/main/scala/kafka/network/BlockingChannel.scala @@ -46,34 +46,36 @@ class BlockingChannel( val host: String, def connect() = lock synchronized { if(!connected) { - try { - channel = SocketChannel.open() - if(readBufferSize > 0) - channel.socket.setReceiveBufferSize(readBufferSize) - if(writeBufferSize > 0) - channel.socket.setSendBufferSize(writeBufferSize) - channel.configureBlocking(true) - channel.socket.setSoTimeout(readTimeoutMs) - channel.socket.setKeepAlive(true) - channel.socket.setTcpNoDelay(true) - channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs) + val addresses = java.net.InetAddress.getAllByName(host).toList + for ( addr <- addresses if !connected ) + try { + channel = SocketChannel.open() + if(readBufferSize > 0) + channel.socket.setReceiveBufferSize(readBufferSize) + if(writeBufferSize > 0) + channel.socket.setSendBufferSize(writeBufferSize) + channel.configureBlocking(true) + channel.socket.setSoTimeout(readTimeoutMs) + channel.socket.setKeepAlive(true) + channel.socket.setTcpNoDelay(true) + channel.socket.connect(new InetSocketAddress(addr, port), connectTimeoutMs) - writeChannel = channel - readChannel = Channels.newChannel(channel.socket().getInputStream) - connected = true - // settings may not match what we requested above - val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d." - debug(msg.format(channel.socket.getSoTimeout, - readTimeoutMs, - channel.socket.getReceiveBufferSize, - readBufferSize, - channel.socket.getSendBufferSize, - writeBufferSize, - connectTimeoutMs)) + writeChannel = channel + readChannel = Channels.newChannel(channel.socket().getInputStream) + connected = true + // settings may not match what we requested above + val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d." + debug(msg.format(channel.socket.getSoTimeout, + readTimeoutMs, + channel.socket.getReceiveBufferSize, + readBufferSize, + channel.socket.getSendBufferSize, + writeBufferSize, + connectTimeoutMs)) - } catch { - case e: Throwable => disconnect() - } + } catch { + case e: Throwable => disconnect() + } } }