From 7ed636c7aadcc0436b2c0eacd59d38efbd62e37b Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Wed, 23 Oct 2013 14:04:53 -0700 Subject: [PATCH 1/7] Added clientHostName and clientPort config params --- core/src/main/scala/kafka/server/KafkaConfig.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 74442b6..0f42e17 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -71,6 +71,14 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* hostname of broker. If this is set, it will only bind to this address. If this is not set, * it will bind to all interfaces, and publish one to ZK */ val hostName: String = props.getString("host.name", null) + + /* Host name or IP to publish to ZooKeeper for clients to use. If this is not set, + * it will publish the same hostname that the broker binds to. */ + val clientHostName: String = props.getString("client.host.name", hostName) + + /* The port number to publish to ZooKeeper for clients to use. If this is not set, + * it will publish the same port that the broker binds to. */ + val clientPort: Int = props.getInt("client.port", port) /* the SO_SNDBUFF buffer of the socket sever sockets */ val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024) -- 1.7.12 From d104c82c40cf1cc87048ccf4a98215769c65ce20 Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Wed, 23 Oct 2013 14:12:09 -0700 Subject: [PATCH 2/7] Server publishing client host and port to ZK --- core/src/main/scala/kafka/server/KafkaHealthcheck.scala | 14 +++++++------- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 84ea17a..1b06745 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -27,14 +27,14 @@ import java.net.InetAddress /** * This class registers the broker in zookeeper to allow * other brokers and consumers to detect failures. It uses an ephemeral znode with the path: - * /brokers/[0...N] --> host:port + * /brokers/[0...N] --> clientHost:clientPort * * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise * we are dead. */ class KafkaHealthcheck(private val brokerId: Int, - private val host: String, - private val port: Int, + private val clientHost: String, + private val clientPort: Int, private val zkSessionTimeoutMs: Int, private val zkClient: ZkClient) extends Logging { @@ -49,13 +49,13 @@ class KafkaHealthcheck(private val brokerId: Int, * Register this broker as "alive" in zookeeper */ def register() { - val hostName = - if(host == null || host.trim.isEmpty) + val clientHostName = + if(clientHost == null || clientHost.trim.isEmpty) InetAddress.getLocalHost.getCanonicalHostName else - host + clientHost val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt - ZkUtils.registerBrokerInZk(zkClient, brokerId, hostName, port, zkSessionTimeoutMs, jmxPort) + ZkUtils.registerBrokerInZk(zkClient, brokerId, clientHostName, clientPort, zkSessionTimeoutMs, jmxPort) } /** diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 5e35a89..872b0d6 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -99,7 +99,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg topicConfigManager.startup() /* tell everyone we are alive */ - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.hostName, config.port, config.zkSessionTimeoutMs, zkClient) + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.clientHostName, config.clientPort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() -- 1.7.12 From 00bcc2014e701dc5dc9206068c42f6c910c4b8e9 Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Wed, 23 Oct 2013 14:34:43 -0700 Subject: [PATCH 3/7] Added new params to sample config --- config/server.properties | 13 ++++++++++--- core/src/main/scala/kafka/server/KafkaConfig.scala | 13 +++++++------ 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/config/server.properties b/config/server.properties index 2eccc5e..563d519 100644 --- a/config/server.properties +++ b/config/server.properties @@ -24,11 +24,18 @@ broker.id=0 # The port the socket server listens on port=9092 -# Hostname the broker will bind to and advertise to producers and consumers. -# If not set, the server will bind to all interfaces and advertise the value returned from -# from java.net.InetAddress.getCanonicalHostName(). +# Hostname the broker will bind to. If not set, the server will bind to all interfaces #host.name=localhost +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#client.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#client.host.port= + # The number of threads handling network requests num.network.threads=2 diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 0f42e17..549b38a 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -69,16 +69,17 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val port: Int = props.getInt("port", 6667) /* hostname of broker. If this is set, it will only bind to this address. If this is not set, - * it will bind to all interfaces, and publish one to ZK */ + * it will bind to all interfaces */ val hostName: String = props.getString("host.name", null) - /* Host name or IP to publish to ZooKeeper for clients to use. If this is not set, - * it will publish the same hostname that the broker binds to. */ - val clientHostName: String = props.getString("client.host.name", hostName) + /* hostname to publish to ZooKeeper for clients to use. If this is not set, + * it will use the value for "host.name" if configured. Otherwise + * it will use the value returned from java.net.InetAddress.getCanonicalHostName(). */ + val clientHostName: String = props.getString("advertise.host.name", hostName) - /* The port number to publish to ZooKeeper for clients to use. If this is not set, + /* the port to publish to ZooKeeper for clients to use. If this is not set, * it will publish the same port that the broker binds to. */ - val clientPort: Int = props.getInt("client.port", port) + val clientPort: Int = props.getInt("advertise.port", port) /* the SO_SNDBUFF buffer of the socket sever sockets */ val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024) -- 1.7.12 From 0482e4db20a9ccb478a3a6f066254976733460f0 Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Wed, 23 Oct 2013 14:39:48 -0700 Subject: [PATCH 4/7] Renamed params to advertise.host.name and advertise.port --- config/server.properties | 4 ++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 4 ++-- core/src/main/scala/kafka/server/KafkaHealthcheck.scala | 14 +++++++------- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/config/server.properties b/config/server.properties index 563d519..326e3b7 100644 --- a/config/server.properties +++ b/config/server.properties @@ -30,11 +30,11 @@ port=9092 # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from # java.net.InetAddress.getCanonicalHostName(). -#client.host.name= +#advertise.host.name= # The port to publish to ZooKeeper for clients to use. If this is not set, # it will publish the same port that the broker binds to. -#client.host.port= +#advertise.port= # The number of threads handling network requests num.network.threads=2 diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 549b38a..3966190 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -75,11 +75,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* hostname to publish to ZooKeeper for clients to use. If this is not set, * it will use the value for "host.name" if configured. Otherwise * it will use the value returned from java.net.InetAddress.getCanonicalHostName(). */ - val clientHostName: String = props.getString("advertise.host.name", hostName) + val advertiseHostName: String = props.getString("advertise.host.name", hostName) /* the port to publish to ZooKeeper for clients to use. If this is not set, * it will publish the same port that the broker binds to. */ - val clientPort: Int = props.getInt("advertise.port", port) + val advertisePort: Int = props.getInt("advertise.port", port) /* the SO_SNDBUFF buffer of the socket sever sockets */ val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024) diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 1b06745..b940274 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -27,14 +27,14 @@ import java.net.InetAddress /** * This class registers the broker in zookeeper to allow * other brokers and consumers to detect failures. It uses an ephemeral znode with the path: - * /brokers/[0...N] --> clientHost:clientPort + * /brokers/[0...N] --> advertiseHost:advertisePort * * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise * we are dead. */ class KafkaHealthcheck(private val brokerId: Int, - private val clientHost: String, - private val clientPort: Int, + private val advertiseHost: String, + private val advertisePort: Int, private val zkSessionTimeoutMs: Int, private val zkClient: ZkClient) extends Logging { @@ -49,13 +49,13 @@ class KafkaHealthcheck(private val brokerId: Int, * Register this broker as "alive" in zookeeper */ def register() { - val clientHostName = - if(clientHost == null || clientHost.trim.isEmpty) + val advertiseHostName = + if(advertiseHost == null || advertiseHost.trim.isEmpty) InetAddress.getLocalHost.getCanonicalHostName else - clientHost + advertiseHost val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt - ZkUtils.registerBrokerInZk(zkClient, brokerId, clientHostName, clientPort, zkSessionTimeoutMs, jmxPort) + ZkUtils.registerBrokerInZk(zkClient, brokerId, advertiseHostName, advertisePort, zkSessionTimeoutMs, jmxPort) } /** diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 872b0d6..ed957b8 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -99,7 +99,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg topicConfigManager.startup() /* tell everyone we are alive */ - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.clientHostName, config.clientPort, config.zkSessionTimeoutMs, zkClient) + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertiseHostName, config.advertisePort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() -- 1.7.12 From a1310cf172ac6a163d83b519b06f4ca3467259b8 Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Wed, 23 Oct 2013 14:43:27 -0700 Subject: [PATCH 5/7] Additional description of advertise.host.name and advertise.port properties --- core/src/main/scala/kafka/server/KafkaConfig.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3966190..d79dfb8 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -72,12 +72,14 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro * it will bind to all interfaces */ val hostName: String = props.getString("host.name", null) - /* hostname to publish to ZooKeeper for clients to use. If this is not set, + /* hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may + * need to be different from the interface to which the broker binds. If this is not set, * it will use the value for "host.name" if configured. Otherwise * it will use the value returned from java.net.InetAddress.getCanonicalHostName(). */ val advertiseHostName: String = props.getString("advertise.host.name", hostName) - /* the port to publish to ZooKeeper for clients to use. If this is not set, + /* the port to publish to ZooKeeper for clients to use. In IaaS environments, this may + * need to be different from the port to which the broker binds. If this is not set, * it will publish the same port that the broker binds to. */ val advertisePort: Int = props.getInt("advertise.port", port) -- 1.7.12 From 27e87f4287a5e3f0b755b8756b7db6b8a436d830 Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Mon, 28 Oct 2013 11:00:33 -0700 Subject: [PATCH 6/7] Added unit tests for KafkaConfig --- .../scala/unit/kafka/server/KafkaConfigTest.scala | 34 ++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 2f75e1d..fee86ae 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -64,4 +64,34 @@ class KafkaConfigTest extends JUnit3Suite { } -} \ No newline at end of file + @Test + def testAdvertiseDefaults() { + val port = 9999 + val hostName = "fake-host" + + val props = TestUtils.createBrokerConfig(0, port) + props.put("host.name", hostName) + + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.advertiseHostName, hostName) + assertEquals(serverConfig.advertisePort, port) + } + + @Test + def testAdvertiseConfigured() { + val port = 9999 + val advertiseHostName = "routable-host" + val advertisePort = 1234 + + val props = TestUtils.createBrokerConfig(0, port) + props.put("advertise.host.name", advertiseHostName) + props.put("advertise.port", advertisePort.toString) + + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.advertiseHostName, advertiseHostName) + assertEquals(serverConfig.advertisePort, advertisePort) + } + +} -- 1.7.12 From 8c59515ebf32447a9f04edacdbd3b17bf2d935a1 Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Mon, 28 Oct 2013 14:23:42 -0700 Subject: [PATCH 7/7] Added unit test to verify advertised broker host name and port published to ZK --- .../unit/kafka/server/AdvertiseBrokerTest.scala | 35 ++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala new file mode 100644 index 0000000..7d24113 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala @@ -0,0 +1,35 @@ +package kafka.server + +import org.scalatest.junit.JUnit3Suite +import kafka.zk.ZooKeeperTestHarness +import junit.framework.Assert._ +import kafka.utils.{ZkUtils, Utils, TestUtils} + +class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness { + var server : KafkaServer = null + val brokerId = 0 + val advertiseHostName = "routable-host" + val advertisePort = 1234 + + override def setUp() { + super.setUp() + val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort()) + props.put("advertise.host.name", advertiseHostName) + props.put("advertise.port", advertisePort.toString) + + server = TestUtils.createServer(new KafkaConfig(props)) + } + + override def tearDown() { + server.shutdown() + Utils.rm(server.config.logDirs) + super.tearDown() + } + + def testBrokerAdvertiseToZK { + val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId) + assertEquals(advertiseHostName, brokerInfo.get.host) + assertEquals(advertisePort, brokerInfo.get.port) + } + +} \ No newline at end of file -- 1.7.12