From d85781a15cbe896e425632cdb9193ed9a3c39122 Mon Sep 17 00:00:00 2001 From: Edward Ribeiro Date: Wed, 19 Aug 2015 18:52:23 -0300 Subject: [PATCH] KAFKA-1811 - Ensuring registered broker host:port is unique --- core/src/main/scala/kafka/utils/ZKLock.scala | 94 ++++++++++++++++++++++ core/src/main/scala/kafka/utils/ZkUtils.scala | 35 ++++++-- .../unit/kafka/server/ServerStartupTest.scala | 16 ++++ .../test/scala/unit/kafka/utils/TestUtils.scala | 18 ++++- core/src/test/scala/unit/kafka/zk/ZKLockTest.scala | 88 ++++++++++++++++++++ 5 files changed, 243 insertions(+), 8 deletions(-) create mode 100644 core/src/main/scala/kafka/utils/ZKLock.scala create mode 100644 core/src/test/scala/unit/kafka/zk/ZKLockTest.scala diff --git a/core/src/main/scala/kafka/utils/ZKLock.scala b/core/src/main/scala/kafka/utils/ZKLock.scala new file mode 100644 index 0000000..c35101c --- /dev/null +++ b/core/src/main/scala/kafka/utils/ZKLock.scala @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import org.I0Itec.zkclient.ZkClient + +/** + * This class implements a ZK based exclusive reeentrant lock, according to classic recipe. + * After Curator integration this class should be replaced by a respective recipe in Curator. + */ +class ZKLock { + + private[this] var lockPath: String = null + private[this] var childLockPath: String = null + private[this] var zkClient: ZkClient = null + private[this] var backoffMs: Int = 0 + private[this] val DEFAULT_BACKOFF_MS: Int = 500 + + @volatile var holdsLock: Boolean = false + + def this(zkClient: ZkClient, path: String, backoff: Option[Int] = None) { + this() + + if (zkClient == null) + throw new IllegalArgumentException("zkClient cannot be null") + this.zkClient = zkClient + + if (path == null || path.isEmpty()) + throw new IllegalArgumentException("Invalid lock path: %s".format(path)) + this.lockPath = path + + this.backoffMs = backoff match { + case Some(x) => x + case None => DEFAULT_BACKOFF_MS + } + } + + @throws(classOf[InterruptedException]) + def acquire(): Unit = { + this.synchronized { + if (holdsLock) + return + + if (!ZkUtils.pathExists(zkClient, lockPath)) + throw new RuntimeException("Persistent lock path '%s' doesn't exists".format(lockPath)) + + var acquired: Boolean = false + do { + acquired = true + + childLockPath = ZkUtils.createEphemeralSequential(zkClient, lockPath + "/exclusive-lock-", "") + val candidateId: Long = childLockPath.substring(childLockPath.lastIndexOf("-") + 1).toLong + + val children = ZkUtils.getChildren(zkClient, lockPath) + for (child <- children) { + val childId: Long = child.substring(child.lastIndexOf("-") + 1).toLong + if (childId < candidateId) + acquired = false + } + + if (!acquired) { + ZkUtils.deletePath(zkClient, childLockPath) + childLockPath = null + Thread.sleep(backoffMs) + } + } while (!acquired) + holdsLock = acquired + } + + } + + def release(): Unit = { + this.synchronized { + if (!holdsLock) + throw new IllegalStateException("Locking is not held for path '%s'".format(childLockPath)) + ZkUtils.deletePath(zkClient, childLockPath) + } + } +} diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 74b587e..9ee2bb1 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -40,6 +40,7 @@ object ZkUtils extends Logging { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" + val BrokerRegisterLocksPath = "/brokers/register-locks" val ControllerPath = "/controller" val ControllerEpochPath = "/controller_epoch" val ReassignPartitionsPath = "/admin/reassign_partitions" @@ -53,6 +54,7 @@ object ZkUtils extends Logging { val persistentZkPaths = Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, + BrokerRegisterLocksPath, EntityConfigChangesPath, ZkUtils.getEntityConfigRootPath(ConfigType.Topic), ZkUtils.getEntityConfigRootPath(ConfigType.Client), @@ -198,13 +200,25 @@ object ZkUtils extends Logging { def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, advertisedEndpoints: immutable.Map[SecurityProtocol, EndPoint], timeout: Int, jmxPort: Int) { val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val timestamp = SystemTime.milliseconds.toString + val zkLock = new ZKLock(zkClient, ZkUtils.BrokerRegisterLocksPath, Some(5)) + zkLock.acquire() + try { + verifyUniqueHostPort(host, port, zkClient) + val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port, "endpoints"->advertisedEndpoints.values.map(_.connectionString).toArray, "jmx_port" -> jmxPort, "timestamp" -> timestamp)) + val expectedBroker = new Broker(id, advertisedEndpoints) + registerBrokerInZk(zkClient, brokerIdPath, brokerInfo, expectedBroker, timeout) + info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(","))) + } finally { + zkLock.release() + } + } - val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port, "endpoints"->advertisedEndpoints.values.map(_.connectionString).toArray, "jmx_port" -> jmxPort, "timestamp" -> timestamp)) - val expectedBroker = new Broker(id, advertisedEndpoints) - - registerBrokerInZk(zkClient, brokerIdPath, brokerInfo, expectedBroker, timeout) - - info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(","))) + private def verifyUniqueHostPort(host:String, port:Int, zkClient: ZkClient): Unit = { + val allBrokers = ZkUtils.getAllBrokersInCluster(zkClient) + for (e <- allBrokers flatMap { broker => broker.endPoints.values }) { + if (e.host == host && e.port == port) + throw new RuntimeException("Host/port combination %s:%d is already registered by an existing broker".format(host, port)) + } } private def registerBrokerInZk(zkClient: ZkClient, brokerIdPath: String, brokerInfo: String, expectedBroker: Broker, timeout: Int) { @@ -360,6 +374,10 @@ object ZkUtils extends Logging { ZkPath.createPersistentSequential(client, path, data) } + def createEphemeralSequential(client: ZkClient, path: String, data: String = ""): String = { + ZkPath.createEphemeralSequential(client, path, data) + } + /** * Update the value of a persistent node with the given path and data. * create parrent directory if necessary. Never throw NodeExistException. @@ -891,4 +909,9 @@ object ZkPath { checkNamespace(client) client.createPersistentSequential(path, data) } + + def createEphemeralSequential(client: ZkClient, path: String, data: Object): String = { + checkNamespace(client) + client.createEphemeralSequential(path, data) + } } diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index 0adc0aa..2c9b575 100755 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -17,11 +17,13 @@ package kafka.server +import kafka.cluster.Broker import kafka.utils.ZkUtils import kafka.utils.CoreUtils import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.Assert._ import org.junit.Test @@ -68,4 +70,18 @@ class ServerStartupTest extends ZooKeeperTestHarness { server1.shutdown() CoreUtils.rm(server1.config.logDirs) } + + @Test + def testConflictBrokerRegistrationWithSameHostAndPort() { + + val brokers = List(0, 1) + + intercept[RuntimeException] { + for (id <- brokers) { + TestUtils.maybeCreateBrokerRegisterLockPath(zkClient) + val broker = new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT) + ZkUtils.registerBrokerInZk(zkClient, id, "localhost", 6667, broker.endPoints, 6000, jmxPort = -1) + } + } + } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 00fbb61..f966798 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -255,6 +255,17 @@ object TestUtils extends Logging { bytes } + def maybeCreateBrokerRegisterLockPath(zkClient: ZkClient): Unit = { + if (!ZkUtils.pathExists(zkClient, ZkUtils.BrokerRegisterLocksPath)) { + try { + ZkUtils.createPersistentPath(zkClient, ZkUtils.BrokerRegisterLocksPath, "") + } + catch { + case e: Exception => // do nothing + } + } + } + /** * Generate a random string of letters and digits of the given length * @param len The length of the string @@ -507,13 +518,16 @@ object TestUtils extends Logging { } def createBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = { + + maybeCreateBrokerRegisterLockPath(zkClient) + val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT)) - brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, "localhost", 6667, b.endPoints, 6000, jmxPort = -1)) + brokers.foreach(b => ZkUtils.registerBrokerInZk(zkClient, b.id, "localhost", 6667 + b.id, b.endPoints, 6000, jmxPort = -1)) brokers } def deleteBrokersInZk(zkClient: ZkClient, ids: Seq[Int]): Seq[Broker] = { - val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT)) + val brokers = ids.map(id => new Broker(id, "localhost", 6667 + id, SecurityProtocol.PLAINTEXT)) brokers.foreach(b => ZkUtils.deletePath(zkClient, ZkUtils.BrokerIdsPath + "/" + b)) brokers } diff --git a/core/src/test/scala/unit/kafka/zk/ZKLockTest.scala b/core/src/test/scala/unit/kafka/zk/ZKLockTest.scala new file mode 100644 index 0000000..21e1c06 --- /dev/null +++ b/core/src/test/scala/unit/kafka/zk/ZKLockTest.scala @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.zk + +import java.util.concurrent.CountDownLatch + +import org.junit.Assert.assertEquals + +import kafka.utils.{ZKLock, ZkUtils} +import kafka.zk.ZooKeeperTestHarness +import java.util.concurrent.atomic.AtomicInteger + +import org.junit.Assert._ +import org.junit.{Test, Before} + +class ZKLockTest extends ZooKeeperTestHarness { + + val lockPath: String = "/lock_dir" + val zkSessionTimeoutMs = 1000 + val zkConnectionTimeoutMs = 3000 + + @Test + def testExclusiveZKLock { + + val zkClient = ZkUtils.createZkClient(zkConnect, zkSessionTimeoutMs, zkConnectionTimeoutMs) + + ZkUtils.createPersistentPath(zkClient, lockPath) + + assertTrue(ZkUtils.pathExists(zkClient, lockPath)) + + val threads = 10 + var update = 0 + var notUpdated = 0 + val lockHolders = new AtomicInteger(); + val startBarrier = new CountDownLatch(1) + val finishBarrier = new CountDownLatch(threads) + + val task = new Runnable () { + override def run { + + startBarrier.await() // wait for all threads to be started + val zkLock = new ZKLock(zkClient, lockPath) + try { + zkLock.acquire() + assertEquals(1, lockHolders.incrementAndGet()) + if (update == 0) + update += 1 + else + notUpdated += 1 + } + catch { + case e: Throwable => fail(e.getMessage) + } + finally { + finishBarrier.countDown() + assertEquals(0, lockHolders.decrementAndGet()) + zkLock.release() + } + } + } + + for (i <- 1 to threads) + new Thread(task).start() + + startBarrier.countDown() // start threads + finishBarrier.await() // wait for all threads to finish + + assertEquals(0, ZkUtils.getChildren(zkClient, lockPath).size) + + assertEquals(1, update) + assertEquals(9, notUpdated) + } +} -- 2.1.4