From 586c7c83d01df5e27deaa61640adf0bfcacb4dff Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 23 Oct 2014 11:32:43 -0700 Subject: [PATCH 1/2] Fix KAFKA-1501 --- core/src/main/scala/kafka/network/SocketServer.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index cee76b3..c83816e 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -253,6 +253,7 @@ private[kafka] class Acceptor(val host: String, serverChannel.configureBlocking(false) serverChannel.socket().setReceiveBufferSize(recvBufferSize) try { + serverChannel.socket.setReuseAddress(true) serverChannel.socket.bind(socketAddress) info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, port)) } catch { -- 1.7.12.4 From c970e004338a655f5d9648a87ef8b0669dce24c6 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sun, 26 Oct 2014 15:55:42 -0700 Subject: [PATCH 2/2] v2 --- core/src/main/scala/kafka/network/SocketServer.scala | 4 ++-- core/src/test/scala/unit/kafka/utils/TestUtils.scala | 8 +++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index c83816e..3c2ccc2 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -251,9 +251,9 @@ private[kafka] class Acceptor(val host: String, new InetSocketAddress(host, port) val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking(false) - serverChannel.socket().setReceiveBufferSize(recvBufferSize) + serverChannel.socket.setReuseAddress(true) + serverChannel.socket.setReceiveBufferSize(recvBufferSize) try { - serverChannel.socket.setReuseAddress(true) serverChannel.socket.bind(socketAddress) info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, port)) } catch { diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index dd3640f..644141c 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -68,7 +68,13 @@ object TestUtils extends Logging { def choosePorts(count: Int): List[Int] = { val sockets = for(i <- 0 until count) - yield new ServerSocket(0) + yield { + val socket = new ServerSocket() + // choose a random port with SO_RESUEADDR enabled + socket.setReuseAddress(true) + socket.bind(new InetSocketAddress(null.asInstanceOf[InetAddress], 0)) + socket + } val socketList = sockets.toList val ports = socketList.map(_.getLocalPort) socketList.map(_.close) -- 1.7.12.4