From d6960c5b5461a0731e74141a6b0f138f7f9f36d5 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Sat, 21 Feb 2015 08:16:27 -0800 Subject: [PATCH 1/4] KAFKA-1971; starting a broker with a conflicting id will delete the previous broker registration --- core/src/main/scala/kafka/server/KafkaHealthcheck.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 4acdd70..4886ae0 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -39,15 +39,18 @@ class KafkaHealthcheck(private val brokerId: Int, val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId val sessionExpireListener = new SessionExpireListener - + @volatile var isBrokerRegisteredInZK = false + def startup() { zkClient.subscribeStateChanges(sessionExpireListener) register() + isBrokerRegisteredInZK = true } def shutdown() { zkClient.unsubscribeStateChanges(sessionExpireListener) - ZkUtils.deregisterBrokerInZk(zkClient, brokerId) + if (isBrokerRegisteredInZK) + ZkUtils.deregisterBrokerInZk(zkClient, brokerId) } /** -- 1.8.5.2 (Apple Git-48) From 7392ffe9505602ac37bbc0bc6f043a490ae1fc18 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Sun, 22 Feb 2015 15:42:52 -0800 Subject: [PATCH 2/4] address review comments --- core/src/main/scala/kafka/server/KafkaHealthcheck.scala | 8 -------- core/src/main/scala/kafka/server/KafkaServer.scala | 2 -- 2 files changed, 10 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 4886ae0..7907987 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -39,18 +39,10 @@ class KafkaHealthcheck(private val brokerId: Int, val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId val sessionExpireListener = new SessionExpireListener - @volatile var isBrokerRegisteredInZK = false def startup() { zkClient.subscribeStateChanges(sessionExpireListener) register() - isBrokerRegisteredInZK = true - } - - def shutdown() { - zkClient.unsubscribeStateChanges(sessionExpireListener) - if (isBrokerRegisteredInZK) - ZkUtils.deregisterBrokerInZk(zkClient, brokerId) } /** diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 7e5ddcb..426e522 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -310,8 +310,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg if (canShutdown) { Utils.swallow(controlledShutdown()) brokerState.newState(BrokerShuttingDown) - if(kafkaHealthcheck != null) - Utils.swallow(kafkaHealthcheck.shutdown()) if(socketServer != null) Utils.swallow(socketServer.shutdown()) if(requestHandlerPool != null) -- 1.8.5.2 (Apple Git-48) From 1da2231814c48f8c50fe44e9d855c2f8df71ce2f Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Sun, 22 Feb 2015 17:13:18 -0800 Subject: [PATCH 3/4] remove unused util method --- core/src/main/scala/kafka/utils/ZkUtils.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index c78a1b6..8a2fb2d 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -189,12 +189,6 @@ object ZkUtils extends Logging { info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port)) } - def deregisterBrokerInZk(zkClient: ZkClient, id: Int) { - val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id - deletePath(zkClient, brokerIdPath) - info("Deregistered broker %d at path %s.".format(id, brokerIdPath)) - } - def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = { val topicDirs = new ZKGroupTopicDirs(group, topic) topicDirs.consumerOwnerDir + "/" + partition -- 1.8.5.2 (Apple Git-48) From 86d61f5304646d7c4fd50124c308896bc7c9799e Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Sun, 22 Feb 2015 20:41:18 -0800 Subject: [PATCH 4/4] add unit test --- .../server/ConflictBrokerRegistrationTest.scala | 51 ++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 core/src/test/scala/unit/kafka/server/ConflictBrokerRegistrationTest.scala diff --git a/core/src/test/scala/unit/kafka/server/ConflictBrokerRegistrationTest.scala b/core/src/test/scala/unit/kafka/server/ConflictBrokerRegistrationTest.scala new file mode 100644 index 0000000..d758146 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ConflictBrokerRegistrationTest.scala @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.scalatest.junit.JUnit3Suite +import kafka.utils.ZkUtils +import kafka.utils.TestUtils + +import kafka.zk.ZooKeeperTestHarness +import junit.framework.Assert._ + +class ConflictBrokerRegistrationTest extends JUnit3Suite with ZooKeeperTestHarness { + + def testConflictingBrokerId { + // Try starting a broker with the a conflicting broker id. + // This shouldn't affect the existing broker registration. + + val brokerId = 0 + val props1 = TestUtils.createBrokerConfig(brokerId) + val server1 = TestUtils.createServer(new KafkaConfig(props1)) + val brokerRegistration = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 + + val props2 = TestUtils.createBrokerConfig(brokerId) + try { + TestUtils.createServer(new KafkaConfig(props2)) + fail("Registering a broker with a conflicting id should fail") + } catch { + case e : RuntimeException => + // this is expected + } + + // broker registration shouldn't change + assertEquals(brokerRegistration, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1) + server1.shutdown() + } +} \ No newline at end of file -- 1.8.5.2 (Apple Git-48)