From 9c12a5ce7645ec95eb79535dd872e03bc8f3165f Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 18 Jul 2014 16:24:37 -0700 Subject: [PATCH 1/6] KAFKA-1070. Auto-assign node id. --- .../common/InconsistentBrokerIdException.scala | 25 ++++++ core/src/main/scala/kafka/server/KafkaConfig.scala | 10 +-- core/src/main/scala/kafka/server/KafkaServer.scala | 83 +++++++++++++++--- core/src/main/scala/kafka/utils/ZkUtils.scala | 40 ++++++++- .../kafka/server/ServerGenerateBrokerIdTest.scala | 97 ++++++++++++++++++++++ .../test/scala/unit/kafka/utils/TestUtils.scala | 4 +- 6 files changed, 240 insertions(+), 19 deletions(-) create mode 100644 core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala create mode 100644 core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala diff --git a/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala b/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala new file mode 100644 index 0000000..5649828 --- /dev/null +++ b/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Indicates the client has requested a range no longer available on the server + */ +class InconsistentBrokerIdException(message: String) extends RuntimeException(message) { + def this() = this(null) +} diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 1a45f87..a040bc5 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -35,13 +35,13 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro private def getLogRetentionTimeMillis(): Long = { val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute - + if(props.containsKey("log.retention.ms")){ props.getIntInRange("log.retention.ms", (1, Int.MaxValue)) } else if(props.containsKey("log.retention.minutes")){ millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue)) - } + } else { millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue)) } @@ -49,7 +49,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro private def getLogRollTimeMillis(): Long = { val millisInHour = 60L * 60L * 1000L - + if(props.containsKey("log.roll.ms")){ props.getIntInRange("log.roll.ms", (1, Int.MaxValue)) } @@ -57,11 +57,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro millisInHour * props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue)) } } - + /*********** General Configuration ***********/ /* the broker id for this server */ - val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue)) + var brokerId: Int = if (props.containsKey("broker.id")) props.getIntInRange("broker.id", (0, 1000)) else -1 /* the maximum size of message that the server can receive */ val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue)) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 2871118..f60f310 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -25,11 +25,16 @@ import kafka.utils._ import java.util.concurrent._ import atomic.{AtomicInteger, AtomicBoolean} import java.io.File +import java.io.FileOutputStream +import java.io.FileNotFoundException +import java.util.Properties +import collection.mutable import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.Broker import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest} import kafka.common.ErrorMapping +import kafka.common.InconsistentBrokerIdException import kafka.network.{Receive, BlockingChannel, SocketServer} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge @@ -39,10 +44,12 @@ import com.yammer.metrics.core.Gauge * to start up and shutdown a single Kafka node. */ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { - this.logIdent = "[Kafka Server " + config.brokerId + "], " + private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) private var startupComplete = new AtomicBoolean(false) + private var brokerId: Int = -1 + private var metaPropsFile = "meta.properties" val brokerState: BrokerState = new BrokerState val correlationId: AtomicInteger = new AtomicInteger(0) var socketServer: SocketServer = null @@ -76,7 +83,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg /* start scheduler */ kafkaScheduler.startup() - + /* setup zookeeper */ zkClient = initZk() @@ -84,6 +91,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg logManager = createLogManager(zkClient, brokerState) logManager.startup() + /* generate brokerId */ + config.brokerId = getBrokerId(zkClient, config.brokerId, config.logDirs) + this.logIdent = "[Kafka Server " + config.brokerId + "], " + socketServer = new SocketServer(config.brokerId, config.hostName, config.port, @@ -101,31 +112,31 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg offsetManager = createOffsetManager() kafkaController = new KafkaController(config, zkClient, brokerState) - + /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) - + Mx4jLoader.maybeLoad() replicaManager.startup() kafkaController.startup() - + topicConfigManager = new TopicConfigManager(zkClient, logManager) topicConfigManager.startup() - + /* tell everyone we are alive */ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() - + registerStats() startupComplete.set(true) info("started") } - + private def initZk(): ZkClient = { info("Connecting to zookeeper on " + config.zkConnect) val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer) @@ -273,9 +284,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def awaitShutdown(): Unit = shutdownLatch.await() def getLogManager(): LogManager = logManager - + private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { - val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, + val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, segmentMs = config.logRollTimeMillis, flushInterval = config.logFlushIntervalMessages, flushMs = config.logFlushIntervalMs.toLong, @@ -324,5 +335,55 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler) } -} + /** + * if the config has brokerId return brokeId, otherwise generates a sequence id from ZK uses it has brokerId. + * Stores the generated zk sequence id in meta.properties file under all logDirs in config. + */ + private def getBrokerId(zkClient: ZkClient, configBrokerId: Int, logDirs: Seq[String]): Int = { + var brokerId = configBrokerId + var logDirsWithoutMetaProps: List[String] = List() + val metaBrokerIdSet = mutable.HashSet[Int]() + if (brokerId < 0) { + for (logDir <- logDirs) { + val (succeeded, metaBrokerId) = readBrokerIdFromMetaProps(logDir) + if(!succeeded) { + logDirsWithoutMetaProps ++= List(logDir) + } else { + metaBrokerIdSet.add(metaBrokerId) + } + } + + if(metaBrokerIdSet.size > 1) { + throw new InconsistentBrokerIdException("unable to match brokerId across logDirs") + } else if(metaBrokerIdSet.size == 0) { + brokerId = ZkUtils.getBrokerSequenceId(zkClient) + } else { + brokerId = metaBrokerIdSet.last + } + storeBrokerId(brokerId, logDirsWithoutMetaProps) + } + return brokerId + } + private def readBrokerIdFromMetaProps(logDir: String): (Boolean, Int) = { + try { + val metaProps = new VerifiableProperties(Utils.loadProps(logDir+"/meta.properties")) + if (metaProps.containsKey("broker.id")) + return (true, metaProps.getIntInRange("broker.id", (0, Int.MaxValue))) + } catch { + case e: FileNotFoundException => + (false, -1) + } + (false, -1) + } + + private def storeBrokerId(brokerId: Int, logDirs: Seq[String]) { + val metaProps = new Properties() + metaProps.setProperty("broker.id", brokerId.toString); + for(logDir <- logDirs) { + val f = Utils.createFile(logDir+"/"+metaPropsFile) + val out = new FileOutputStream(f) + metaProps.store(out,"") + } + } +} diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index dcdc1ce..a5266d3 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -46,6 +46,7 @@ object ZkUtils extends Logging { val ReassignPartitionsPath = "/admin/reassign_partitions" val DeleteTopicsPath = "/admin/delete_topics" val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" + val BrokerSequenceIdPath = "/brokers/seqid" def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic @@ -87,7 +88,8 @@ object ZkUtils extends Logging { } def setupCommonPaths(zkClient: ZkClient) { - for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, DeleteTopicsPath)) + for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, + TopicConfigPath, DeleteTopicsPath, BrokerSequenceIdPath)) makeSurePersistentPathExists(zkClient, path) } @@ -122,6 +124,16 @@ object ZkUtils extends Logging { } } + /** returns a sequence id generated by updating BrokerSequenceIdPath in Zk. + * we add 1000 to the return value from getSequenceId for the backward compatability. + * users can provide brokerId in the config , inorder to avoid conflicts between zk generated + * seqId and config.brokerId we increment zk seqId by 1000. This is a arbitary number will + * be going away in the future versions as we completely rely on zk seqId. + */ + def getBrokerSequenceId(zkClient: ZkClient): Int = { + getSequenceId(zkClient, BrokerSequenceIdPath) + 1000 + } + /** * Gets the in-sync replicas (ISR) for a specific topic and partition */ @@ -691,6 +703,32 @@ object ZkUtils extends Logging { } } + /** + * This API produces a sequence number by creating / updating given path in zookeeper + * It uses the stat returned by the zookeeper and return the version. Every time + * client updates the path stat.version gets incremented + */ + def getSequenceId(client: ZkClient, path: String): Int = { + try { + val stat = client.writeDataReturnStat(path, "", -1) + return stat.getVersion + } catch { + case e: ZkNoNodeException => { + createParentPath(client, BrokerSequenceIdPath) + try { + client.createPersistent(BrokerSequenceIdPath, "") + return 0 + } catch { + case e: ZkNodeExistsException => + val stat = client.writeDataReturnStat(BrokerSequenceIdPath, "", -1) + return stat.getVersion + case e2: Throwable => throw e2 + } + } + case e2: Throwable => throw e2 + } + } + def getAllTopics(zkClient: ZkClient): Seq[String] = { val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath) if(topics == null) diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala new file mode 100644 index 0000000..8d28fad --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.zk.ZooKeeperTestHarness +import kafka.utils.{IntEncoder, TestUtils, Utils, VerifiableProperties} +import kafka.utils.TestUtils._ +import java.io.File +import org.junit.Test +import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ + +class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { + val port = TestUtils.choosePort + var props1 = TestUtils.createBrokerConfig(-1, port) + var config1 = new KafkaConfig(props1) + var props2 = TestUtils.createBrokerConfig(0, port) + var config2 = new KafkaConfig(props2) + + @Test + def testAutoGenerateBrokerId() { + var server1 = new KafkaServer(config1) + server1.startup() + // do a clean shutdown and check that offset checkpoint file exists + server1.shutdown() + for(logDir <- config1.logDirs) { + val metaProps = new VerifiableProperties(Utils.loadProps(logDir+"/meta.properties")) + assertTrue(metaProps.containsKey("broker.id")) + assertEquals(metaProps.getInt("broker.id"),1001) + } + // restart the server check to see if it uses the brokerId generated previously + server1 = new KafkaServer(config1) + assertEquals(server1.config.brokerId, 1001) + server1.shutdown() + Utils.rm(server1.config.logDirs) + + + // start the server with broker.id as part of config + var server2 = new KafkaServer(config2) + assertEquals(server2.config.brokerId,0) + server2.shutdown() + Utils.rm(server2.config.logDirs) + + // add multiple logDirs and check if the generate brokerId is stored in all of them + props2 = TestUtils.createBrokerConfig(-1, port) + var logDirs = props2.getProperty("log.dir")+","+TestUtils.tempDir().getAbsolutePath + props2.setProperty("log.dir",logDirs) + config2 = new KafkaConfig(props2) + server1 = new KafkaServer(config2) + server1.startup() + server1.shutdown() + for(logDir <- config2.logDirs) { + val metaProps = new VerifiableProperties(Utils.loadProps(logDir+"/meta.properties")) + assertTrue(metaProps.containsKey("broker.id")) + assertEquals(metaProps.getInt("broker.id"),1002) + } + Utils.rm(server1.config.logDirs) + + // addition to log.dirs after generation of a broker.id from zk should be copied over + props2 = TestUtils.createBrokerConfig(-1,port) + config2 = new KafkaConfig(props2) + server1 = new KafkaServer(config2) + server1.startup() + server1.shutdown() + logDirs = props2.getProperty("log.dir")+","+TestUtils.tempDir().getAbsolutePath + server1.startup() + for(logDir <- config2.logDirs) { + val metaProps = new VerifiableProperties(Utils.loadProps(logDir+"/meta.properties")) + assertTrue(metaProps.containsKey("broker.id")) + assertEquals(metaProps.getInt("broker.id"),1003) + } + server1.shutdown() + Utils.rm(server1.config.logDirs) + verifyNonDaemonThreadsStatus + } + + def verifyNonDaemonThreadsStatus() { + assertEquals(0, Thread.getAllStackTraces.keySet().toArray + .map(_.asInstanceOf[Thread]) + .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka"))) + } + +} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c4e13c5..008e6b6 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -91,7 +91,7 @@ object TestUtils extends Logging { Utils.rm(f) } }) - + f } @@ -151,7 +151,7 @@ object TestUtils extends Logging { def createBrokerConfig(nodeId: Int, port: Int = choosePort(), enableControlledShutdown: Boolean = true): Properties = { val props = new Properties - props.put("broker.id", nodeId.toString) + if (nodeId >= 0) props.put("broker.id", nodeId.toString) props.put("host.name", "localhost") props.put("port", port.toString) props.put("log.dir", TestUtils.tempDir().getAbsolutePath) -- 1.8.5.2 (Apple Git-48) From 3e0167f2e083640b2e3026cb2503797115e758a2 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sat, 19 Jul 2014 15:36:19 -0700 Subject: [PATCH 2/6] KAFKA-1070. Auto assign node id. --- core/src/main/scala/kafka/server/KafkaServer.scala | 39 ++++++++++++---------- core/src/main/scala/kafka/utils/ZkUtils.scala | 3 +- .../kafka/server/ServerGenerateBrokerIdTest.scala | 14 +++++++- 3 files changed, 36 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index f60f310..865fe66 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -336,38 +336,43 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg } /** - * if the config has brokerId return brokeId, otherwise generates a sequence id from ZK uses it has brokerId. - * Stores the generated zk sequence id in meta.properties file under all logDirs in config. + * if kafka server config has brokerId and there is no meta.properties file returns the config.brokerId, + * otherwise generates a sequence id from ZK uses it has a brokerId. + * stores the generated zk sequence id in meta.properties under logDirs specified in config. + * if config has brokerId and meta.properties contains brokerId if they don't match throws InconsistentBrokerIdException */ private def getBrokerId(zkClient: ZkClient, configBrokerId: Int, logDirs: Seq[String]): Int = { var brokerId = configBrokerId var logDirsWithoutMetaProps: List[String] = List() val metaBrokerIdSet = mutable.HashSet[Int]() - if (brokerId < 0) { - for (logDir <- logDirs) { - val (succeeded, metaBrokerId) = readBrokerIdFromMetaProps(logDir) - if(!succeeded) { - logDirsWithoutMetaProps ++= List(logDir) - } else { - metaBrokerIdSet.add(metaBrokerId) - } + + for (logDir <- logDirs) { + val (succeeded, metaBrokerId) = readBrokerIdFromMetaProps(logDir) + if(!succeeded) { + logDirsWithoutMetaProps ++= List(logDir) + } else { + metaBrokerIdSet.add(metaBrokerId) } + } - if(metaBrokerIdSet.size > 1) { - throw new InconsistentBrokerIdException("unable to match brokerId across logDirs") - } else if(metaBrokerIdSet.size == 0) { + if(metaBrokerIdSet.size > 1) { + throw new InconsistentBrokerIdException("unable to match brokerId across logDirs") + } else if(brokerId >= 0 && metaBrokerIdSet.size == 1 && metaBrokerIdSet.last != brokerId) { + throw new InconsistentBrokerIdException("configured brokerId doesn't match stored brokerId in meta.properties") + } else if(metaBrokerIdSet.size == 0) { + if(brokerId < 0) { brokerId = ZkUtils.getBrokerSequenceId(zkClient) } else { - brokerId = metaBrokerIdSet.last + return brokerId } - storeBrokerId(brokerId, logDirsWithoutMetaProps) } + storeBrokerId(brokerId, logDirsWithoutMetaProps) return brokerId } private def readBrokerIdFromMetaProps(logDir: String): (Boolean, Int) = { try { - val metaProps = new VerifiableProperties(Utils.loadProps(logDir+"/meta.properties")) + val metaProps = new VerifiableProperties(Utils.loadProps(logDir + File.separator + metaPropsFile)) if (metaProps.containsKey("broker.id")) return (true, metaProps.getIntInRange("broker.id", (0, Int.MaxValue))) } catch { @@ -381,7 +386,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val metaProps = new Properties() metaProps.setProperty("broker.id", brokerId.toString); for(logDir <- logDirs) { - val f = Utils.createFile(logDir+"/"+metaPropsFile) + val f = Utils.createFile(logDir + File.separator + metaPropsFile) val out = new FileOutputStream(f) metaProps.store(out,"") } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index a5266d3..6b65b4a 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -127,8 +127,7 @@ object ZkUtils extends Logging { /** returns a sequence id generated by updating BrokerSequenceIdPath in Zk. * we add 1000 to the return value from getSequenceId for the backward compatability. * users can provide brokerId in the config , inorder to avoid conflicts between zk generated - * seqId and config.brokerId we increment zk seqId by 1000. This is a arbitary number will - * be going away in the future versions as we completely rely on zk seqId. + * seqId and config.brokerId we increment zk seqId by 1000. */ def getBrokerSequenceId(zkClient: ZkClient): Int = { getSequenceId(zkClient, BrokerSequenceIdPath) + 1000 diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala index 8d28fad..f8cf294 100644 --- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -44,6 +44,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { } // restart the server check to see if it uses the brokerId generated previously server1 = new KafkaServer(config1) + server1.startup() assertEquals(server1.config.brokerId, 1001) server1.shutdown() Utils.rm(server1.config.logDirs) @@ -51,13 +52,15 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { // start the server with broker.id as part of config var server2 = new KafkaServer(config2) + server2.startup() assertEquals(server2.config.brokerId,0) server2.shutdown() Utils.rm(server2.config.logDirs) // add multiple logDirs and check if the generate brokerId is stored in all of them props2 = TestUtils.createBrokerConfig(-1, port) - var logDirs = props2.getProperty("log.dir")+","+TestUtils.tempDir().getAbsolutePath + var logDirs = props2.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath + + "," + TestUtils.tempDir().getAbsolutePath props2.setProperty("log.dir",logDirs) config2 = new KafkaConfig(props2) server1 = new KafkaServer(config2) @@ -84,6 +87,15 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(metaProps.getInt("broker.id"),1003) } server1.shutdown() + // check if configured brokerId and stored brokerId are equal or throw InconsistentBrokerException + props2 = TestUtils.createBrokerConfig(0,port) + config2 = new KafkaConfig(props2) + try { + server1.startup() + } catch { + case e: kafka.common.InconsistentBrokerIdException => //success + } + server1.shutdown() Utils.rm(server1.config.logDirs) verifyNonDaemonThreadsStatus } -- 1.8.5.2 (Apple Git-48) From a670c570626033c69ecdbf41058e1b34b9451af0 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Tue, 22 Jul 2014 11:33:13 -0700 Subject: [PATCH 3/6] KAFKA-1070. Auto-assign node id. --- core/src/main/scala/kafka/server/KafkaConfig.scala | 13 +++++++++---- core/src/main/scala/kafka/server/KafkaServer.scala | 20 +++++++++++--------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a040bc5..7d6c337 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -27,6 +27,8 @@ import kafka.utils.{VerifiableProperties, ZKConfig, Utils} */ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(props) { + val ReservedBrokerIdMaxValue = 1000 + def this(originalProps: Properties) { this(new VerifiableProperties(originalProps)) props.verify() @@ -60,8 +62,11 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /*********** General Configuration ***********/ - /* the broker id for this server */ - var brokerId: Int = if (props.containsKey("broker.id")) props.getIntInRange("broker.id", (0, 1000)) else -1 + /* The broker id for this server. + * To avoid conflicts between zookeeper generated brokerId and user's config.brokerId + * added ReservedBrokerIdMaxValue and zookeeper sequence starts from ReservedBrokerIdMaxValue + 1. + */ + var brokerId: Int = if (props.containsKey("broker.id")) props.getIntInRange("broker.id", (0, ReservedBrokerIdMaxValue)) else -1 /* the maximum size of message that the server can receive */ val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue)) @@ -106,10 +111,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the maximum number of bytes in a socket request */ val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024, (1, Int.MaxValue)) - + /* the maximum number of connections we allow from each ip address */ val maxConnectionsPerIp: Int = props.getIntInRange("max.connections.per.ip", Int.MaxValue, (1, Int.MaxValue)) - + /* per-ip or hostname overrides to the default maximum number of connections */ val maxConnectionsPerIpOverrides = props.getMap("max.connections.per.ip.overrides").map(entry => (entry._1, entry._2.toInt)) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 865fe66..be44cdc 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -347,12 +347,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val metaBrokerIdSet = mutable.HashSet[Int]() for (logDir <- logDirs) { - val (succeeded, metaBrokerId) = readBrokerIdFromMetaProps(logDir) - if(!succeeded) { - logDirsWithoutMetaProps ++= List(logDir) - } else { - metaBrokerIdSet.add(metaBrokerId) + val metaBrokerIdOpt = readBrokerIdFromMetaProps(logDir) + metaBrokerIdOpt match { + case Some(metaBrokerId) => + metaBrokerIdSet.add(metaBrokerId) + case None => + logDirsWithoutMetaProps ++= List(logDir) } + } if(metaBrokerIdSet.size > 1) { @@ -370,16 +372,16 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg return brokerId } - private def readBrokerIdFromMetaProps(logDir: String): (Boolean, Int) = { + private def readBrokerIdFromMetaProps(logDir: String): Option[Int] = { try { val metaProps = new VerifiableProperties(Utils.loadProps(logDir + File.separator + metaPropsFile)) if (metaProps.containsKey("broker.id")) - return (true, metaProps.getIntInRange("broker.id", (0, Int.MaxValue))) + return Some(metaProps.getIntInRange("broker.id", (0, Int.MaxValue))) } catch { case e: FileNotFoundException => - (false, -1) + None } - (false, -1) + None } private def storeBrokerId(brokerId: Int, logDirs: Seq[String]) { -- 1.8.5.2 (Apple Git-48) From a53282eb1b9fae908db0c3f9bd03e98b942f2f08 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 24 Jul 2014 20:56:10 -0700 Subject: [PATCH 4/6] KAFKA-1070. Auto-assign node id. --- core/src/main/scala/kafka/server/KafkaConfig.scala | 9 +-- core/src/main/scala/kafka/server/KafkaServer.scala | 25 +++++--- core/src/main/scala/kafka/utils/ZkUtils.scala | 9 +-- .../kafka/server/ServerGenerateBrokerIdTest.scala | 73 +++++++++++++--------- .../unit/kafka/server/ServerShutdownTest.scala | 11 +--- .../test/scala/unit/kafka/utils/TestUtils.scala | 5 ++ 6 files changed, 79 insertions(+), 53 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 7d6c337..07d5cd1 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -27,8 +27,6 @@ import kafka.utils.{VerifiableProperties, ZKConfig, Utils} */ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(props) { - val ReservedBrokerIdMaxValue = 1000 - def this(originalProps: Properties) { this(new VerifiableProperties(originalProps)) props.verify() @@ -62,11 +60,14 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /*********** General Configuration ***********/ + /* Max number that can be used for a broker.id */ + val MaxReservedBrokerId = props.getIntInRange("reserved.broker.max.id", 1000, (0, Int.MaxValue)) + /* The broker id for this server. * To avoid conflicts between zookeeper generated brokerId and user's config.brokerId - * added ReservedBrokerIdMaxValue and zookeeper sequence starts from ReservedBrokerIdMaxValue + 1. + * added MaxReservedBrokerId and zookeeper sequence starts from MaxReservedBrokerId + 1. */ - var brokerId: Int = if (props.containsKey("broker.id")) props.getIntInRange("broker.id", (0, ReservedBrokerIdMaxValue)) else -1 + var brokerId: Int = if (props.containsKey("broker.id")) props.getIntInRange("broker.id", (0, MaxReservedBrokerId)) else -1 /* the maximum size of message that the server can receive */ val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue)) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index be44cdc..4038129 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -92,7 +92,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg logManager.startup() /* generate brokerId */ - config.brokerId = getBrokerId(zkClient, config.brokerId, config.logDirs) + config.brokerId = getBrokerId(zkClient, config) this.logIdent = "[Kafka Server " + config.brokerId + "], " socketServer = new SocketServer(config.brokerId, @@ -337,16 +337,16 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg /** * if kafka server config has brokerId and there is no meta.properties file returns the config.brokerId, - * otherwise generates a sequence id from ZK uses it has a brokerId. + * otherwise generates a sequence id from ZK uses it as the brokerId. * stores the generated zk sequence id in meta.properties under logDirs specified in config. * if config has brokerId and meta.properties contains brokerId if they don't match throws InconsistentBrokerIdException */ - private def getBrokerId(zkClient: ZkClient, configBrokerId: Int, logDirs: Seq[String]): Int = { - var brokerId = configBrokerId + private def getBrokerId(zkClient: ZkClient, config: KafkaConfig): Int = { + var brokerId = config.brokerId var logDirsWithoutMetaProps: List[String] = List() val metaBrokerIdSet = mutable.HashSet[Int]() - for (logDir <- logDirs) { + for (logDir <- config.logDirs) { val metaBrokerIdOpt = readBrokerIdFromMetaProps(logDir) metaBrokerIdOpt match { case Some(metaBrokerId) => @@ -363,12 +363,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg throw new InconsistentBrokerIdException("configured brokerId doesn't match stored brokerId in meta.properties") } else if(metaBrokerIdSet.size == 0) { if(brokerId < 0) { - brokerId = ZkUtils.getBrokerSequenceId(zkClient) + brokerId = ZkUtils.getBrokerSequenceId(zkClient,config.MaxReservedBrokerId) } else { return brokerId } + } else { + brokerId = metaBrokerIdSet.last } - storeBrokerId(brokerId, logDirsWithoutMetaProps) + + if(!logDirsWithoutMetaProps.isEmpty) + storeBrokerId(brokerId, logDirsWithoutMetaProps) + return brokerId } @@ -379,7 +384,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg return Some(metaProps.getIntInRange("broker.id", (0, Int.MaxValue))) } catch { case e: FileNotFoundException => + warn("unable to read meta.properties file under dir %s due to %s".format(logDir, e.getMessage)) None + case e1: Exception => + error("unable to read meta.properties file under dir %s due to %s".format(logDir, e1.getMessage)) + throw e1 } None } @@ -391,6 +400,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val f = Utils.createFile(logDir + File.separator + metaPropsFile) val out = new FileOutputStream(f) metaProps.store(out,"") + out.getFD().sync(); + out.close(); } } } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 6b65b4a..30c7e3f 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -33,6 +33,7 @@ import kafka.controller.KafkaController import scala.Some import kafka.controller.LeaderIsrAndControllerEpoch import kafka.common.TopicAndPartition +import kafka.server.KafkaConfig import scala.collection object ZkUtils extends Logging { @@ -47,6 +48,7 @@ object ZkUtils extends Logging { val DeleteTopicsPath = "/admin/delete_topics" val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" val BrokerSequenceIdPath = "/brokers/seqid" + val BrokerIdMaxValue = 1000 def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic @@ -125,12 +127,11 @@ object ZkUtils extends Logging { } /** returns a sequence id generated by updating BrokerSequenceIdPath in Zk. - * we add 1000 to the return value from getSequenceId for the backward compatability. * users can provide brokerId in the config , inorder to avoid conflicts between zk generated - * seqId and config.brokerId we increment zk seqId by 1000. + * seqId and config.brokerId we increment zk seqId by KafkaConfig.MaxReservedBrokerId. */ - def getBrokerSequenceId(zkClient: ZkClient): Int = { - getSequenceId(zkClient, BrokerSequenceIdPath) + 1000 + def getBrokerSequenceId(zkClient: ZkClient, MaxReservedBrokerId: Int): Int = { + getSequenceId(zkClient, BrokerSequenceIdPath) + MaxReservedBrokerId } /** diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala index f8cf294..80ecc9c 100644 --- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -25,10 +25,9 @@ import org.scalatest.junit.JUnit3Suite import junit.framework.Assert._ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { - val port = TestUtils.choosePort - var props1 = TestUtils.createBrokerConfig(-1, port) + var props1 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort) var config1 = new KafkaConfig(props1) - var props2 = TestUtils.createBrokerConfig(0, port) + var props2 = TestUtils.createBrokerConfig(0, TestUtils.choosePort) var config2 = new KafkaConfig(props2) @Test @@ -48,48 +47,68 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(server1.config.brokerId, 1001) server1.shutdown() Utils.rm(server1.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus + } - + def testUserConfigAndGenratedBrokerId() { // start the server with broker.id as part of config - var server2 = new KafkaServer(config2) + val server1 = new KafkaServer(config1) + val server2 = new KafkaServer(config2) + val props3 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort) + val config3 = new KafkaConfig(props3) + val server3 = new KafkaServer(config3) + server1.startup() + assertEquals(server1.config.brokerId,1001) server2.startup() assertEquals(server2.config.brokerId,0) + server3.startup() + assertEquals(server3.config.brokerId,1002) + server1.shutdown() server2.shutdown() + server3.shutdown() + Utils.rm(server1.config.logDirs) Utils.rm(server2.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus + } + def testMultipleLogDirsMetaProps() { // add multiple logDirs and check if the generate brokerId is stored in all of them - props2 = TestUtils.createBrokerConfig(-1, port) - var logDirs = props2.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath + + var logDirs = props1.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath + "," + TestUtils.tempDir().getAbsolutePath - props2.setProperty("log.dir",logDirs) - config2 = new KafkaConfig(props2) - server1 = new KafkaServer(config2) + props1.setProperty("log.dir",logDirs) + config1 = new KafkaConfig(props1) + var server1 = new KafkaServer(config1) server1.startup() server1.shutdown() - for(logDir <- config2.logDirs) { + for(logDir <- config1.logDirs) { val metaProps = new VerifiableProperties(Utils.loadProps(logDir+"/meta.properties")) assertTrue(metaProps.containsKey("broker.id")) - assertEquals(metaProps.getInt("broker.id"),1002) + assertEquals(metaProps.getInt("broker.id"),1001) } - Utils.rm(server1.config.logDirs) // addition to log.dirs after generation of a broker.id from zk should be copied over - props2 = TestUtils.createBrokerConfig(-1,port) - config2 = new KafkaConfig(props2) - server1 = new KafkaServer(config2) + var newLogDirs = props1.getProperty("log.dir") + "," + TestUtils.tempDir().getAbsolutePath + props1.setProperty("log.dir",newLogDirs) + config1 = new KafkaConfig(props1) + server1 = new KafkaServer(config1) server1.startup() server1.shutdown() - logDirs = props2.getProperty("log.dir")+","+TestUtils.tempDir().getAbsolutePath - server1.startup() - for(logDir <- config2.logDirs) { + for(logDir <- config1.logDirs) { val metaProps = new VerifiableProperties(Utils.loadProps(logDir+"/meta.properties")) assertTrue(metaProps.containsKey("broker.id")) - assertEquals(metaProps.getInt("broker.id"),1003) + assertEquals(metaProps.getInt("broker.id"),1001) } - server1.shutdown() + Utils.rm(server1.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus + } + + @Test + def testConsistentBrokerIdFromUserConfigAndMetaProps() { // check if configured brokerId and stored brokerId are equal or throw InconsistentBrokerException - props2 = TestUtils.createBrokerConfig(0,port) - config2 = new KafkaConfig(props2) + var server1 = new KafkaServer(config1) //auto generate broker Id + server1.startup() + server1.shutdown() + server1 = new KafkaServer(config2) // user specified broker id try { server1.startup() } catch { @@ -97,13 +116,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { } server1.shutdown() Utils.rm(server1.config.logDirs) - verifyNonDaemonThreadsStatus - } - - def verifyNonDaemonThreadsStatus() { - assertEquals(0, Thread.getAllStackTraces.keySet().toArray - .map(_.asInstanceOf[Thread]) - .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka"))) + TestUtils.verifyNonDaemonThreadsStatus } } diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index ab60e9b..e56ee15 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -63,7 +63,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { assertTrue(OffsetCheckpointFile.length() > 0) } producer.close() - + /* now restart the server and check that the written data is still readable and everything still works */ server = new KafkaServer(config) server.startup() @@ -98,7 +98,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { producer.close() server.shutdown() Utils.rm(server.config.logDirs) - verifyNonDaemonThreadsStatus + TestUtils.verifyNonDaemonThreadsStatus } @Test @@ -111,12 +111,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { server.shutdown() server.awaitShutdown() Utils.rm(server.config.logDirs) - verifyNonDaemonThreadsStatus + TestUtils.verifyNonDaemonThreadsStatus } - def verifyNonDaemonThreadsStatus() { - assertEquals(0, Thread.getAllStackTraces.keySet().toArray - .map(_.asInstanceOf[Thread]) - .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka"))) - } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 008e6b6..bfe732e 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -691,6 +691,11 @@ object TestUtils extends Logging { ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath) } + def verifyNonDaemonThreadsStatus() { + assertEquals(0, Thread.getAllStackTraces.keySet().toArray + .map(_.asInstanceOf[Thread]) + .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka"))) + } /** * Create new LogManager instance with default configuration for testing -- 1.8.5.2 (Apple Git-48) From f86ed16b0738b5374110733c5160cdd32e738712 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 24 Jul 2014 21:04:35 -0700 Subject: [PATCH 5/6] KAFKA-1070. Auto-assign node id. --- core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala index 80ecc9c..f954b36 100644 --- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -50,6 +50,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.verifyNonDaemonThreadsStatus } + @Test def testUserConfigAndGenratedBrokerId() { // start the server with broker.id as part of config val server1 = new KafkaServer(config1) @@ -71,6 +72,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { TestUtils.verifyNonDaemonThreadsStatus } + @Test def testMultipleLogDirsMetaProps() { // add multiple logDirs and check if the generate brokerId is stored in all of them var logDirs = props1.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath + -- 1.8.5.2 (Apple Git-48) From 408f78010eaf0760f8d9fe63487e5e5e40ab4acc Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 21 Aug 2014 10:25:37 -0700 Subject: [PATCH 6/6] KAFKA-1070. Auto-assign node id. --- core/src/main/scala/kafka/server/KafkaServer.scala | 1 + core/src/main/scala/kafka/utils/ZkUtils.scala | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 4038129..33da52a 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -400,6 +400,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val f = Utils.createFile(logDir + File.separator + metaPropsFile) val out = new FileOutputStream(f) metaProps.store(out,"") + out.flush() out.getFD().sync(); out.close(); } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 30c7e3f..6fe101c 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -48,7 +48,6 @@ object ZkUtils extends Logging { val DeleteTopicsPath = "/admin/delete_topics" val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" val BrokerSequenceIdPath = "/brokers/seqid" - val BrokerIdMaxValue = 1000 def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic -- 1.8.5.2 (Apple Git-48)