From a4909994283897c73ca6ceee68737306d18b81a4 Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Wed, 23 Oct 2013 14:04:53 -0700 Subject: [PATCH 01/10] Added clientHostName and clientPort config params --- core/src/main/scala/kafka/server/KafkaConfig.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 74442b6..0f42e17 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -71,6 +71,14 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* hostname of broker. If this is set, it will only bind to this address. If this is not set, * it will bind to all interfaces, and publish one to ZK */ val hostName: String = props.getString("host.name", null) + + /* Host name or IP to publish to ZooKeeper for clients to use. If this is not set, + * it will publish the same hostname that the broker binds to. */ + val clientHostName: String = props.getString("client.host.name", hostName) + + /* The port number to publish to ZooKeeper for clients to use. If this is not set, + * it will publish the same port that the broker binds to. */ + val clientPort: Int = props.getInt("client.port", port) /* the SO_SNDBUFF buffer of the socket sever sockets */ val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024) -- 1.7.12 From 9fc6d0b5b7ddb6b8cfa3876085fcf69fc4242912 Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Wed, 23 Oct 2013 14:12:09 -0700 Subject: [PATCH 02/10] Server publishing client host and port to ZK --- core/src/main/scala/kafka/server/KafkaHealthcheck.scala | 14 +++++++------- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 84ea17a..1b06745 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -27,14 +27,14 @@ import java.net.InetAddress /** * This class registers the broker in zookeeper to allow * other brokers and consumers to detect failures. It uses an ephemeral znode with the path: - * /brokers/[0...N] --> host:port + * /brokers/[0...N] --> clientHost:clientPort * * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise * we are dead. */ class KafkaHealthcheck(private val brokerId: Int, - private val host: String, - private val port: Int, + private val clientHost: String, + private val clientPort: Int, private val zkSessionTimeoutMs: Int, private val zkClient: ZkClient) extends Logging { @@ -49,13 +49,13 @@ class KafkaHealthcheck(private val brokerId: Int, * Register this broker as "alive" in zookeeper */ def register() { - val hostName = - if(host == null || host.trim.isEmpty) + val clientHostName = + if(clientHost == null || clientHost.trim.isEmpty) InetAddress.getLocalHost.getCanonicalHostName else - host + clientHost val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt - ZkUtils.registerBrokerInZk(zkClient, brokerId, hostName, port, zkSessionTimeoutMs, jmxPort) + ZkUtils.registerBrokerInZk(zkClient, brokerId, clientHostName, clientPort, zkSessionTimeoutMs, jmxPort) } /** diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 5e35a89..872b0d6 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -99,7 +99,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg topicConfigManager.startup() /* tell everyone we are alive */ - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.hostName, config.port, config.zkSessionTimeoutMs, zkClient) + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.clientHostName, config.clientPort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() -- 1.7.12 From 916c8832b4485d6f7bc92cfc8423d82332e3df58 Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Wed, 23 Oct 2013 14:34:43 -0700 Subject: [PATCH 03/10] Added new params to sample config --- config/server.properties | 13 ++++++++++--- core/src/main/scala/kafka/server/KafkaConfig.scala | 13 +++++++------ 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/config/server.properties b/config/server.properties index 2eccc5e..563d519 100644 --- a/config/server.properties +++ b/config/server.properties @@ -24,11 +24,18 @@ broker.id=0 # The port the socket server listens on port=9092 -# Hostname the broker will bind to and advertise to producers and consumers. -# If not set, the server will bind to all interfaces and advertise the value returned from -# from java.net.InetAddress.getCanonicalHostName(). +# Hostname the broker will bind to. If not set, the server will bind to all interfaces #host.name=localhost +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#client.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#client.host.port= + # The number of threads handling network requests num.network.threads=2 diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 0f42e17..549b38a 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -69,16 +69,17 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val port: Int = props.getInt("port", 6667) /* hostname of broker. If this is set, it will only bind to this address. If this is not set, - * it will bind to all interfaces, and publish one to ZK */ + * it will bind to all interfaces */ val hostName: String = props.getString("host.name", null) - /* Host name or IP to publish to ZooKeeper for clients to use. If this is not set, - * it will publish the same hostname that the broker binds to. */ - val clientHostName: String = props.getString("client.host.name", hostName) + /* hostname to publish to ZooKeeper for clients to use. If this is not set, + * it will use the value for "host.name" if configured. Otherwise + * it will use the value returned from java.net.InetAddress.getCanonicalHostName(). */ + val clientHostName: String = props.getString("advertise.host.name", hostName) - /* The port number to publish to ZooKeeper for clients to use. If this is not set, + /* the port to publish to ZooKeeper for clients to use. If this is not set, * it will publish the same port that the broker binds to. */ - val clientPort: Int = props.getInt("client.port", port) + val clientPort: Int = props.getInt("advertise.port", port) /* the SO_SNDBUFF buffer of the socket sever sockets */ val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024) -- 1.7.12 From de36ab012a541610627ee48208cddceea9445693 Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Wed, 23 Oct 2013 14:39:48 -0700 Subject: [PATCH 04/10] Renamed params to advertise.host.name and advertise.port --- config/server.properties | 4 ++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 4 ++-- core/src/main/scala/kafka/server/KafkaHealthcheck.scala | 14 +++++++------- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/config/server.properties b/config/server.properties index 563d519..326e3b7 100644 --- a/config/server.properties +++ b/config/server.properties @@ -30,11 +30,11 @@ port=9092 # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from # java.net.InetAddress.getCanonicalHostName(). -#client.host.name= +#advertise.host.name= # The port to publish to ZooKeeper for clients to use. If this is not set, # it will publish the same port that the broker binds to. -#client.host.port= +#advertise.port= # The number of threads handling network requests num.network.threads=2 diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 549b38a..3966190 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -75,11 +75,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* hostname to publish to ZooKeeper for clients to use. If this is not set, * it will use the value for "host.name" if configured. Otherwise * it will use the value returned from java.net.InetAddress.getCanonicalHostName(). */ - val clientHostName: String = props.getString("advertise.host.name", hostName) + val advertiseHostName: String = props.getString("advertise.host.name", hostName) /* the port to publish to ZooKeeper for clients to use. If this is not set, * it will publish the same port that the broker binds to. */ - val clientPort: Int = props.getInt("advertise.port", port) + val advertisePort: Int = props.getInt("advertise.port", port) /* the SO_SNDBUFF buffer of the socket sever sockets */ val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024) diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 1b06745..b940274 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -27,14 +27,14 @@ import java.net.InetAddress /** * This class registers the broker in zookeeper to allow * other brokers and consumers to detect failures. It uses an ephemeral znode with the path: - * /brokers/[0...N] --> clientHost:clientPort + * /brokers/[0...N] --> advertiseHost:advertisePort * * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise * we are dead. */ class KafkaHealthcheck(private val brokerId: Int, - private val clientHost: String, - private val clientPort: Int, + private val advertiseHost: String, + private val advertisePort: Int, private val zkSessionTimeoutMs: Int, private val zkClient: ZkClient) extends Logging { @@ -49,13 +49,13 @@ class KafkaHealthcheck(private val brokerId: Int, * Register this broker as "alive" in zookeeper */ def register() { - val clientHostName = - if(clientHost == null || clientHost.trim.isEmpty) + val advertiseHostName = + if(advertiseHost == null || advertiseHost.trim.isEmpty) InetAddress.getLocalHost.getCanonicalHostName else - clientHost + advertiseHost val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt - ZkUtils.registerBrokerInZk(zkClient, brokerId, clientHostName, clientPort, zkSessionTimeoutMs, jmxPort) + ZkUtils.registerBrokerInZk(zkClient, brokerId, advertiseHostName, advertisePort, zkSessionTimeoutMs, jmxPort) } /** diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 872b0d6..ed957b8 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -99,7 +99,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg topicConfigManager.startup() /* tell everyone we are alive */ - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.clientHostName, config.clientPort, config.zkSessionTimeoutMs, zkClient) + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertiseHostName, config.advertisePort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() -- 1.7.12 From d62182a446b65424f9bde6f81e67d71e725c89a0 Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Wed, 23 Oct 2013 14:43:27 -0700 Subject: [PATCH 05/10] Additional description of advertise.host.name and advertise.port properties --- core/src/main/scala/kafka/server/KafkaConfig.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3966190..d79dfb8 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -72,12 +72,14 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro * it will bind to all interfaces */ val hostName: String = props.getString("host.name", null) - /* hostname to publish to ZooKeeper for clients to use. If this is not set, + /* hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may + * need to be different from the interface to which the broker binds. If this is not set, * it will use the value for "host.name" if configured. Otherwise * it will use the value returned from java.net.InetAddress.getCanonicalHostName(). */ val advertiseHostName: String = props.getString("advertise.host.name", hostName) - /* the port to publish to ZooKeeper for clients to use. If this is not set, + /* the port to publish to ZooKeeper for clients to use. In IaaS environments, this may + * need to be different from the port to which the broker binds. If this is not set, * it will publish the same port that the broker binds to. */ val advertisePort: Int = props.getInt("advertise.port", port) -- 1.7.12 From 3f4f7419742b1c52c637ed989c95fb15efed8017 Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Mon, 28 Oct 2013 11:00:33 -0700 Subject: [PATCH 06/10] Added unit tests for KafkaConfig --- .../scala/unit/kafka/server/KafkaConfigTest.scala | 34 ++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 2f75e1d..fee86ae 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -64,4 +64,34 @@ class KafkaConfigTest extends JUnit3Suite { } -} \ No newline at end of file + @Test + def testAdvertiseDefaults() { + val port = 9999 + val hostName = "fake-host" + + val props = TestUtils.createBrokerConfig(0, port) + props.put("host.name", hostName) + + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.advertiseHostName, hostName) + assertEquals(serverConfig.advertisePort, port) + } + + @Test + def testAdvertiseConfigured() { + val port = 9999 + val advertiseHostName = "routable-host" + val advertisePort = 1234 + + val props = TestUtils.createBrokerConfig(0, port) + props.put("advertise.host.name", advertiseHostName) + props.put("advertise.port", advertisePort.toString) + + val serverConfig = new KafkaConfig(props) + + assertEquals(serverConfig.advertiseHostName, advertiseHostName) + assertEquals(serverConfig.advertisePort, advertisePort) + } + +} -- 1.7.12 From cf3e6bfb93ea9e191f674f6664dad2d4c4e8bc7a Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Mon, 28 Oct 2013 14:23:42 -0700 Subject: [PATCH 07/10] Added unit test to verify advertised broker host name and port published to ZK --- .../unit/kafka/server/AdvertiseBrokerTest.scala | 35 ++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala new file mode 100644 index 0000000..7d24113 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala @@ -0,0 +1,35 @@ +package kafka.server + +import org.scalatest.junit.JUnit3Suite +import kafka.zk.ZooKeeperTestHarness +import junit.framework.Assert._ +import kafka.utils.{ZkUtils, Utils, TestUtils} + +class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness { + var server : KafkaServer = null + val brokerId = 0 + val advertiseHostName = "routable-host" + val advertisePort = 1234 + + override def setUp() { + super.setUp() + val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort()) + props.put("advertise.host.name", advertiseHostName) + props.put("advertise.port", advertisePort.toString) + + server = TestUtils.createServer(new KafkaConfig(props)) + } + + override def tearDown() { + server.shutdown() + Utils.rm(server.config.logDirs) + super.tearDown() + } + + def testBrokerAdvertiseToZK { + val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId) + assertEquals(advertiseHostName, brokerInfo.get.host) + assertEquals(advertisePort, brokerInfo.get.port) + } + +} \ No newline at end of file -- 1.7.12 From b5d1f20b738e6ba052e5d0bb8fdc973a5303a4d6 Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Wed, 30 Oct 2013 11:06:11 -0700 Subject: [PATCH 08/10] Renamed properties to advertised.host.name and advertised.port --- config/server.properties | 4 ++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 4 ++-- core/src/main/scala/kafka/server/KafkaHealthcheck.scala | 14 +++++++------- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- .../scala/unit/kafka/server/AdvertiseBrokerTest.scala | 12 ++++++------ .../test/scala/unit/kafka/server/KafkaConfigTest.scala | 16 ++++++++-------- 6 files changed, 26 insertions(+), 26 deletions(-) diff --git a/config/server.properties b/config/server.properties index 326e3b7..8efa83f 100644 --- a/config/server.properties +++ b/config/server.properties @@ -30,11 +30,11 @@ port=9092 # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from # java.net.InetAddress.getCanonicalHostName(). -#advertise.host.name= +#advertised.host.name= # The port to publish to ZooKeeper for clients to use. If this is not set, # it will publish the same port that the broker binds to. -#advertise.port= +#advertised.port= # The number of threads handling network requests num.network.threads=2 diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index d79dfb8..b324344 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -76,12 +76,12 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro * need to be different from the interface to which the broker binds. If this is not set, * it will use the value for "host.name" if configured. Otherwise * it will use the value returned from java.net.InetAddress.getCanonicalHostName(). */ - val advertiseHostName: String = props.getString("advertise.host.name", hostName) + val advertisedHostName: String = props.getString("advertised.host.name", hostName) /* the port to publish to ZooKeeper for clients to use. In IaaS environments, this may * need to be different from the port to which the broker binds. If this is not set, * it will publish the same port that the broker binds to. */ - val advertisePort: Int = props.getInt("advertise.port", port) + val advertisedPort: Int = props.getInt("advertised.port", port) /* the SO_SNDBUFF buffer of the socket sever sockets */ val socketSendBufferBytes: Int = props.getInt("socket.send.buffer.bytes", 100*1024) diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index b940274..9dca55c 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -27,14 +27,14 @@ import java.net.InetAddress /** * This class registers the broker in zookeeper to allow * other brokers and consumers to detect failures. It uses an ephemeral znode with the path: - * /brokers/[0...N] --> advertiseHost:advertisePort + * /brokers/[0...N] --> advertisedHost:advertisedPort * * Right now our definition of health is fairly naive. If we register in zk we are healthy, otherwise * we are dead. */ class KafkaHealthcheck(private val brokerId: Int, - private val advertiseHost: String, - private val advertisePort: Int, + private val advertisedHost: String, + private val advertisedPort: Int, private val zkSessionTimeoutMs: Int, private val zkClient: ZkClient) extends Logging { @@ -49,13 +49,13 @@ class KafkaHealthcheck(private val brokerId: Int, * Register this broker as "alive" in zookeeper */ def register() { - val advertiseHostName = - if(advertiseHost == null || advertiseHost.trim.isEmpty) + val advertisedHostName = + if(advertisedHost == null || advertisedHost.trim.isEmpty) InetAddress.getLocalHost.getCanonicalHostName else - advertiseHost + advertisedHost val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt - ZkUtils.registerBrokerInZk(zkClient, brokerId, advertiseHostName, advertisePort, zkSessionTimeoutMs, jmxPort) + ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, zkSessionTimeoutMs, jmxPort) } /** diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index ed957b8..5e34f95 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -99,7 +99,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg topicConfigManager.startup() /* tell everyone we are alive */ - kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertiseHostName, config.advertisePort, config.zkSessionTimeoutMs, zkClient) + kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala index 7d24113..bc76150 100644 --- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala @@ -8,14 +8,14 @@ import kafka.utils.{ZkUtils, Utils, TestUtils} class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness { var server : KafkaServer = null val brokerId = 0 - val advertiseHostName = "routable-host" - val advertisePort = 1234 + val advertisedHostName = "routable-host" + val advertisedPort = 1234 override def setUp() { super.setUp() val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort()) - props.put("advertise.host.name", advertiseHostName) - props.put("advertise.port", advertisePort.toString) + props.put("advertised.host.name", advertisedHostName) + props.put("advertised.port", advertisedPort.toString) server = TestUtils.createServer(new KafkaConfig(props)) } @@ -28,8 +28,8 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness { def testBrokerAdvertiseToZK { val brokerInfo = ZkUtils.getBrokerInfo(zkClient, brokerId) - assertEquals(advertiseHostName, brokerInfo.get.host) - assertEquals(advertisePort, brokerInfo.get.port) + assertEquals(advertisedHostName, brokerInfo.get.host) + assertEquals(advertisedPort, brokerInfo.get.port) } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index fee86ae..89c207a 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -74,24 +74,24 @@ class KafkaConfigTest extends JUnit3Suite { val serverConfig = new KafkaConfig(props) - assertEquals(serverConfig.advertiseHostName, hostName) - assertEquals(serverConfig.advertisePort, port) + assertEquals(serverConfig.advertisedHostName, hostName) + assertEquals(serverConfig.advertisedPort, port) } @Test def testAdvertiseConfigured() { val port = 9999 - val advertiseHostName = "routable-host" - val advertisePort = 1234 + val advertisedHostName = "routable-host" + val advertisedPort = 1234 val props = TestUtils.createBrokerConfig(0, port) - props.put("advertise.host.name", advertiseHostName) - props.put("advertise.port", advertisePort.toString) + props.put("advertised.host.name", advertisedHostName) + props.put("advertised.port", advertisedPort.toString) val serverConfig = new KafkaConfig(props) - assertEquals(serverConfig.advertiseHostName, advertiseHostName) - assertEquals(serverConfig.advertisePort, advertisePort) + assertEquals(serverConfig.advertisedHostName, advertisedHostName) + assertEquals(serverConfig.advertisedPort, advertisedPort) } } -- 1.7.12 From d4e8a683de5fa231eed0b9ed8ffdffe324d518f4 Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Wed, 30 Oct 2013 13:05:21 -0700 Subject: [PATCH 09/10] Fixed issue where ProducerTest need to override new param --- core/src/test/scala/unit/kafka/producer/ProducerTest.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 2fb059b..a6584d9 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -51,11 +51,13 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ private val props1 = TestUtils.createBrokerConfig(brokerId1, port1) private val config1 = new KafkaConfig(props1) { override val hostName = "localhost" + override val advertisedHostName = "localhost" override val numPartitions = 4 } private val props2 = TestUtils.createBrokerConfig(brokerId2, port2) private val config2 = new KafkaConfig(props2) { override val hostName = "localhost" + override val advertisedHostName = "localhost" override val numPartitions = 4 } -- 1.7.12 From 1a574fc50e2255ec486e539476b9465896586050 Mon Sep 17 00:00:00 2001 From: Roger Hoover Date: Wed, 30 Oct 2013 14:29:23 -0700 Subject: [PATCH 10/10] A better way to fix the ProducerTest --- core/src/test/scala/unit/kafka/producer/ProducerTest.scala | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index a6584d9..4b2e4ad 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -49,17 +49,11 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ private var servers = List.empty[KafkaServer] private val props1 = TestUtils.createBrokerConfig(brokerId1, port1) - private val config1 = new KafkaConfig(props1) { - override val hostName = "localhost" - override val advertisedHostName = "localhost" - override val numPartitions = 4 - } + props1.put("num.partitions", "4") + private val config1 = new KafkaConfig(props1) private val props2 = TestUtils.createBrokerConfig(brokerId2, port2) - private val config2 = new KafkaConfig(props2) { - override val hostName = "localhost" - override val advertisedHostName = "localhost" - override val numPartitions = 4 - } + props2.put("num.partitions", "4") + private val config2 = new KafkaConfig(props2) override def setUp() { super.setUp() -- 1.7.12