diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 9cdadd7..78a6e57 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -32,6 +32,7 @@ import kafka.utils._ * M Handler threads that handle requests and produce responses back to the processor threads for writing. */ class SocketServer(val brokerId: Int, + val host: String, val port: Int, val numProcessorThreads: Int, val maxQueuedRequests: Int, @@ -39,7 +40,7 @@ class SocketServer(val brokerId: Int, this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) - private var acceptor: Acceptor = new Acceptor(port, processors) + private var acceptor: Acceptor = new Acceptor(host, port, processors) val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests) /** @@ -123,7 +124,7 @@ private[kafka] abstract class AbstractServerThread extends Runnable with Logging /** * Thread that accepts and configures new connections. There is only need for one of these */ -private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor]) extends AbstractServerThread { +private[kafka] class Acceptor(val host: String, val port: Int, private val processors: Array[Processor]) extends AbstractServerThread { /** * Accept loop that checks for new connection attempts @@ -131,7 +132,7 @@ private[kafka] class Acceptor(val port: Int, private val processors: Array[Proce def run() { val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking(false) - serverChannel.socket.bind(new InetSocketAddress(port)) + serverChannel.socket.bind(new InetSocketAddress(host, port)) serverChannel.register(selector, SelectionKey.OP_ACCEPT); info("Awaiting connections on port " + port) startupComplete() diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 725226a..5fd320c 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -62,6 +62,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg logManager.startup() socketServer = new SocketServer(config.brokerId, + config.hostName, config.port, config.numNetworkThreads, config.numQueuedRequests, diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 9074ca8..e6e7013 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -34,6 +34,7 @@ import kafka.message.ByteBufferMessageSet class SocketServerTest extends JUnitSuite { val server: SocketServer = new SocketServer(0, + host = "localhost", port = TestUtils.choosePort, numProcessorThreads = 1, maxQueuedRequests = 50, diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index aa58dce..f8bf3c5 100644 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -199,6 +199,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness { private def createBrokerConfig(nodeId: Int, port: Int): Properties = { val props = new Properties props.put("brokerid", nodeId.toString) + props.put("hostname", "localhost") props.put("port", port.toString) props.put("log.dir", getLogDir.getAbsolutePath) props.put("log.flush.interval", "1")