diff --git a/config/server.properties b/config/server.properties index e92f599..36d8152 100644 --- a/config/server.properties +++ b/config/server.properties @@ -19,8 +19,9 @@ # The id of the broker. This must be set to a unique integer for each broker. brokerid=0 -# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned -# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost +# Hostname the broker will bind to and advertise to consumers via Zookeeper. +# If not set, it will bind to all interfaces and advertise the value returned from +# from InetAddress.getCanonicalHostName(). If there are multiple interfaces getCanonicalHostName # may not be what you want. #hostname= diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 9cdadd7..684451b 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -32,6 +32,7 @@ import kafka.utils._ * M Handler threads that handle requests and produce responses back to the processor threads for writing. */ class SocketServer(val brokerId: Int, + val host: String, val port: Int, val numProcessorThreads: Int, val maxQueuedRequests: Int, @@ -39,7 +40,7 @@ class SocketServer(val brokerId: Int, this.logIdent = "[Socket Server on Broker " + brokerId + "], " private val time = SystemTime private val processors = new Array[Processor](numProcessorThreads) - private var acceptor: Acceptor = new Acceptor(port, processors) + private var acceptor: Acceptor = new Acceptor(host, port, processors) val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests) /** @@ -123,15 +124,16 @@ private[kafka] abstract class AbstractServerThread extends Runnable with Logging /** * Thread that accepts and configures new connections. There is only need for one of these */ -private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor]) extends AbstractServerThread { +private[kafka] class Acceptor(val host: String, val port: Int, private val processors: Array[Processor]) extends AbstractServerThread { /** * Accept loop that checks for new connection attempts */ def run() { val serverChannel = ServerSocketChannel.open() + val socketAddress = createSocketAddress(host, port) serverChannel.configureBlocking(false) - serverChannel.socket.bind(new InetSocketAddress(port)) + serverChannel.socket.bind(socketAddress) serverChannel.register(selector, SelectionKey.OP_ACCEPT); info("Awaiting connections on port " + port) startupComplete() @@ -176,6 +178,16 @@ private[kafka] class Acceptor(val port: Int, private val processors: Array[Proce processor.accept(socketChannel) } + /* + * Build the InetSocketAddress based on the host. If the host is null, bind to all interfaces. + */ + private def createSocketAddress(host: String, port: Int) = { + if(host != null) + new InetSocketAddress(host, port) + else + new InetSocketAddress(port) + } + } /** diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 13b2484..d79a505 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -20,7 +20,6 @@ package kafka.server import java.util.Properties import kafka.message.Message import kafka.consumer.ConsumerConfig -import java.net.InetAddress import kafka.utils.{VerifiableProperties, ZKConfig, Utils} /** @@ -59,8 +58,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the port to listen and accept connections on */ val port: Int = props.getInt("port", 6667) - /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */ - val hostName: String = props.getString("hostname", InetAddress.getLocalHost.getHostAddress) + /* hostname of broker. If this is set, it will only bind to this address. If this is not set, + * it will bind to all interfaces, and publish one to ZK */ + val hostName: String = props.getString("hostname", null) /* the SO_SNDBUFF buffer of the socket sever sockets */ val socketSendBuffer: Int = props.getInt("socket.send.buffer", 100*1024) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 725226a..5fd320c 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -62,6 +62,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg logManager.startup() socketServer = new SocketServer(config.brokerId, + config.hostName, config.port, config.numNetworkThreads, config.numQueuedRequests, diff --git a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala index 67a0be8..b0139f6 100644 --- a/core/src/main/scala/kafka/server/KafkaZooKeeper.scala +++ b/core/src/main/scala/kafka/server/KafkaZooKeeper.scala @@ -21,6 +21,7 @@ import kafka.utils._ import org.apache.zookeeper.Watcher.Event.KeeperState import org.I0Itec.zkclient.{IZkStateListener, ZkClient} import kafka.common._ +import java.net.InetAddress /** @@ -42,7 +43,9 @@ class KafkaZooKeeper(config: KafkaConfig) extends Logging { private def registerBrokerInZk() { info("Registering broker " + brokerIdPath) - val hostName = config.hostName + var hostName = config.hostName + if (hostName == null) + hostName = InetAddress.getLocalHost().getCanonicalHostName val creatorId = hostName + "-" + System.currentTimeMillis ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port) } diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 9074ca8..81e5231 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -34,6 +34,7 @@ import kafka.message.ByteBufferMessageSet class SocketServerTest extends JUnitSuite { val server: SocketServer = new SocketServer(0, + host = null, port = TestUtils.choosePort, numProcessorThreads = 1, maxQueuedRequests = 50,