From 7cf8c5fe06f6e9f7b8a17651e78d23e3abbdd49d Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 20 Nov 2014 10:22:15 -0800 Subject: [PATCH] KAFKA-1070. Auto-assign node id. --- .../kafka/common/GenerateBrokerIdException.scala | 25 ++++ .../common/InconsistentBrokerIdException.scala | 25 ++++ core/src/main/scala/kafka/server/KafkaConfig.scala | 23 +++- core/src/main/scala/kafka/server/KafkaServer.scala | 114 ++++++++++++++-- core/src/main/scala/kafka/utils/Utils.scala | 109 ++++++++------- core/src/main/scala/kafka/utils/ZkUtils.scala | 38 +++++- .../kafka/server/ServerGenerateBrokerIdTest.scala | 148 +++++++++++++++++++++ .../test/scala/unit/kafka/utils/TestUtils.scala | 9 +- 8 files changed, 424 insertions(+), 67 deletions(-) create mode 100644 core/src/main/scala/kafka/common/GenerateBrokerIdException.scala create mode 100644 core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala create mode 100644 core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala diff --git a/core/src/main/scala/kafka/common/GenerateBrokerIdException.scala b/core/src/main/scala/kafka/common/GenerateBrokerIdException.scala new file mode 100644 index 0000000..6fbbef7 --- /dev/null +++ b/core/src/main/scala/kafka/common/GenerateBrokerIdException.scala @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Indicates the brokerId stored in logDirs is not consistent acorss logDirs. + */ +class GenerateBrokerIdException(message: String) extends RuntimeException(message) { + def this() = this(null) +} diff --git a/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala b/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala new file mode 100644 index 0000000..fe0c475 --- /dev/null +++ b/core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +/** + * Indicates the brokerId stored in logDirs is not consistent acorss logDirs. + */ +class InconsistentBrokerIdException(message: String) extends RuntimeException(message) { + def this() = this(null) +} diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 6e26c54..f6e7b77 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -35,13 +35,13 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro private def getLogRetentionTimeMillis(): Long = { val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute - + if(props.containsKey("log.retention.ms")){ props.getIntInRange("log.retention.ms", (1, Int.MaxValue)) } else if(props.containsKey("log.retention.minutes")){ millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue)) - } + } else { millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue)) } @@ -49,7 +49,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro private def getLogRollTimeMillis(): Long = { val millisInHour = 60L * 60L * 1000L - + if(props.containsKey("log.roll.ms")){ props.getIntInRange("log.roll.ms", (1, Int.MaxValue)) } @@ -71,8 +71,17 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /*********** General Configuration ***********/ - /* the broker id for this server */ - val brokerId: Int = props.getIntInRange("broker.id", (0, Int.MaxValue)) + /* Max number that can be used for a broker.id */ + val MaxReservedBrokerId = props.getIntInRange("reserved.broker.max.id", 1000, (0, Int.MaxValue)) + + /* the broker id generation policy , must be either "sequence" or "ip" */ + val brokerIdPolicy = props.getString("broker.id.policy", "sequence") + + /* The broker id for this server. + * To avoid conflicts between zookeeper generated brokerId and user's config.brokerId + * added MaxReservedBrokerId and zookeeper sequence starts from MaxReservedBrokerId + 1. + */ + var brokerId: Int = if (props.containsKey("broker.id")) props.getIntInRange("broker.id", (0, MaxReservedBrokerId)) else -1 /* the maximum size of message that the server can receive */ val messageMaxBytes = props.getIntInRange("message.max.bytes", 1000000 + MessageSet.LogOverhead, (0, Int.MaxValue)) @@ -117,10 +126,10 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the maximum number of bytes in a socket request */ val socketRequestMaxBytes: Int = props.getIntInRange("socket.request.max.bytes", 100*1024*1024, (1, Int.MaxValue)) - + /* the maximum number of connections we allow from each ip address */ val maxConnectionsPerIp: Int = props.getIntInRange("max.connections.per.ip", Int.MaxValue, (1, Int.MaxValue)) - + /* per-ip or hostname overrides to the default maximum number of connections */ val maxConnectionsPerIpOverrides = props.getMap("max.connections.per.ip.overrides").map(entry => (entry._1, entry._2.toInt)) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 1bf7d10..6054900 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -25,11 +25,16 @@ import kafka.utils._ import java.util.concurrent._ import atomic.{AtomicInteger, AtomicBoolean} import java.io.File +import java.net.BindException +import java.io.FileOutputStream +import java.io.FileNotFoundException +import java.util.Properties +import collection.mutable import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.Broker import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest} -import kafka.common.ErrorMapping +import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException} import kafka.network.{Receive, BlockingChannel, SocketServer} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge @@ -39,10 +44,11 @@ import com.yammer.metrics.core.Gauge * to start up and shutdown a single Kafka node. */ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { - this.logIdent = "[Kafka Server " + config.brokerId + "], " private var isShuttingDown = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) private var startupComplete = new AtomicBoolean(false) + private var brokerId: Int = -1 + private var metaPropsFile = "meta.properties" val brokerState: BrokerState = new BrokerState val correlationId: AtomicInteger = new AtomicInteger(0) var socketServer: SocketServer = null @@ -77,7 +83,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg /* start scheduler */ kafkaScheduler.startup() - + /* setup zookeeper */ zkClient = initZk() @@ -85,6 +91,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg logManager = createLogManager(zkClient, brokerState) logManager.startup() + /* generate brokerId */ + config.brokerId = getBrokerId(zkClient, config) + this.logIdent = "[Kafka Server " + config.brokerId + "], " + socketServer = new SocketServer(config.brokerId, config.hostName, config.port, @@ -103,26 +113,25 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg offsetManager = createOffsetManager() kafkaController = new KafkaController(config, zkClient, brokerState) - + /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) - + Mx4jLoader.maybeLoad() replicaManager.startup() kafkaController.startup() - + topicConfigManager = new TopicConfigManager(zkClient, logManager) topicConfigManager.startup() - + /* tell everyone we are alive */ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() - registerStats() startupComplete.set(true) info("started") @@ -306,7 +315,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def awaitShutdown(): Unit = shutdownLatch.await() def getLogManager(): LogManager = logManager - + private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, segmentMs = config.logRollTimeMillis, @@ -358,5 +367,90 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler) } -} + /** + * if kafka server config has brokerId and there is no meta.properties file returns the config.brokerId, + * generates a broker id based on broker.id.policy + * stores the generated broker id in meta.properties under logDirs specified in config. + * if config has brokerId and meta.properties contains brokerId if they don't match throws InconsistentBrokerIdException + */ + private def getBrokerId(zkClient: ZkClient, config: KafkaConfig): Int = { + var brokerId = config.brokerId + var logDirsWithoutMetaProps: List[String] = List() + val metaBrokerIdSet = mutable.HashSet[Int]() + + for (logDir <- config.logDirs) { + val metaBrokerIdOpt = readBrokerIdFromMetaProps(logDir) + metaBrokerIdOpt match { + case Some(metaBrokerId) => + metaBrokerIdSet.add(metaBrokerId) + case None => + logDirsWithoutMetaProps ++= List(logDir) + } + + } + + if(metaBrokerIdSet.size > 1) { + throw new InconsistentBrokerIdException("failed to match brokerId across logDirs") + } else if(brokerId >= 0 && metaBrokerIdSet.size == 1 && metaBrokerIdSet.last != brokerId) { + throw new InconsistentBrokerIdException("configured brokerId doesn't match stored brokerId in meta.properties") + } else if(metaBrokerIdSet.size == 0) { + if(brokerId < 0) { + brokerId = generateBrokerId(config) + } else { + return brokerId + } + } else { + brokerId = metaBrokerIdSet.last + } + + if(!logDirsWithoutMetaProps.isEmpty) + storeBrokerId(brokerId, logDirsWithoutMetaProps) + return brokerId + } + + private def generateBrokerId(config: KafkaConfig): Int = { + try { + config.brokerIdPolicy.trim.toLowerCase match { + case "ip" => + Utils.ipToInt(config.hostName) + case _ => + ZkUtils.getBrokerSequenceId(zkClient,config.MaxReservedBrokerId) + } + } catch { + case e: Exception => + error("failed to generate broker.id due to %s".format(e.getMessage)) + throw new GenerateBrokerIdException("failed to generate broker.id") + } + } + + private def readBrokerIdFromMetaProps(logDir: String): Option[Int] = { + try { + val metaProps = new VerifiableProperties(Utils.loadProps(logDir + File.separator + metaPropsFile)) + if (metaProps.containsKey("broker.id")) + return Some(metaProps.getIntInRange("broker.id", (0, Int.MaxValue))) + } catch { + case e: FileNotFoundException => + warn("failed to read meta.properties file under dir %s due to %s".format(logDir, e.getMessage)) + None + case e1: Exception => + error("failed to read meta.properties file under dir %s due to %s".format(logDir, e1.getMessage)) + throw e1 + } + None + } + + private def storeBrokerId(brokerId: Int, logDirs: Seq[String]) { + val metaProps = new Properties() + metaProps.setProperty("broker.id", brokerId.toString); + for(logDir <- logDirs) { + val f = Utils.createFile(logDir + File.separator + metaPropsFile) + val out = new FileOutputStream(f) + metaProps.store(out,"") + out.flush() + out.getFD().sync(); + out.close(); + } + } + +} diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 23aefb4..3e903c4 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -19,6 +19,7 @@ package kafka.utils import java.io._ import java.nio._ +import java.net._ import charset.Charset import java.nio.channels._ import java.util.concurrent.locks.{ReadWriteLock, Lock} @@ -33,10 +34,10 @@ import kafka.common.KafkaStorageException /** * General helper functions! - * + * * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in - * the standard library etc. - * + * the standard library etc. + * * If you are making a new helper function and want to add it to this class please ensure the following: * 1. It has documentation * 2. It is the most general possible utility, not just the thing you needed in one particular place @@ -68,18 +69,18 @@ object Utils extends Logging { * @param runnable The runnable to execute in the background * @return The unstarted thread */ - def daemonThread(name: String, runnable: Runnable): Thread = + def daemonThread(name: String, runnable: Runnable): Thread = newThread(name, runnable, true) - + /** * Create a daemon thread * @param name The name of the thread * @param fun The runction to execute in the thread * @return The unstarted thread */ - def daemonThread(name: String, fun: () => Unit): Thread = + def daemonThread(name: String, fun: () => Unit): Thread = daemonThread(name, runnable(fun)) - + /** * Create a new thread * @param name The name of the thread @@ -88,16 +89,16 @@ object Utils extends Logging { * @return The unstarted thread */ def newThread(name: String, runnable: Runnable, daemon: Boolean): Thread = { - val thread = new Thread(runnable, name) + val thread = new Thread(runnable, name) thread.setDaemon(daemon) thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { def uncaughtException(t: Thread, e: Throwable) { error("Uncaught exception in thread '" + t.getName + "':", e) - } + } }) thread } - + /** * Create a new thread * @param runnable The work for the thread to do @@ -114,7 +115,7 @@ object Utils extends Logging { }) thread } - + /** * Read the given byte buffer into a byte array */ @@ -161,7 +162,7 @@ object Utils extends Logging { else new FileInputStream(file).getChannel() } - + /** * Do the given action and log any exceptions thrown without rethrowing them * @param log The log method to use for logging. E.g. logger.warn @@ -174,7 +175,7 @@ object Utils extends Logging { case e: Throwable => log(e.getMessage(), e) } } - + /** * Test if two byte buffers are equal. In this case equality means having * the same bytes from the current position to the limit @@ -191,7 +192,7 @@ object Utils extends Logging { return false return true } - + /** * Translate the given buffer into a string * @param buffer The buffer to translate @@ -202,7 +203,7 @@ object Utils extends Logging { buffer.get(bytes) new String(bytes, encoding) } - + /** * Print an error message and shutdown the JVM * @param message The error message @@ -211,19 +212,19 @@ object Utils extends Logging { System.err.println(message) System.exit(1) } - + /** * Recursively delete the given file/directory and any subfiles (if any exist) * @param file The root file at which to begin deleting */ def rm(file: String): Unit = rm(new File(file)) - + /** * Recursively delete the list of files/directories and any subfiles (if any exist) * @param a sequence of files to be deleted */ def rm(files: Seq[String]): Unit = files.map(f => rm(new File(f))) - + /** * Recursively delete the given file/directory and any subfiles (if any exist) * @param file The root file at which to begin deleting @@ -242,7 +243,7 @@ object Utils extends Logging { file.delete() } } - + /** * Register the given mbean with the platform mbean server, * unregistering any mbean that was there before. Note, @@ -270,7 +271,7 @@ object Utils extends Logging { } } } - + /** * Unregister the mbean with the given name, if there is one registered * @param name The mbean name to unregister @@ -283,16 +284,16 @@ object Utils extends Logging { mbs.unregisterMBean(objName) } } - + /** - * Read an unsigned integer from the current position in the buffer, + * Read an unsigned integer from the current position in the buffer, * incrementing the position by 4 bytes * @param buffer The buffer to read from * @return The integer read, as a long to avoid signedness */ - def readUnsignedInt(buffer: ByteBuffer): Long = + def readUnsignedInt(buffer: ByteBuffer): Long = buffer.getInt() & 0xffffffffL - + /** * Read an unsigned integer from the given position without modifying the buffers * position @@ -300,33 +301,33 @@ object Utils extends Logging { * @param index the index from which to read the integer * @return The integer read, as a long to avoid signedness */ - def readUnsignedInt(buffer: ByteBuffer, index: Int): Long = + def readUnsignedInt(buffer: ByteBuffer, index: Int): Long = buffer.getInt(index) & 0xffffffffL - + /** * Write the given long value as a 4 byte unsigned integer. Overflow is ignored. * @param buffer The buffer to write to * @param value The value to write */ - def writetUnsignedInt(buffer: ByteBuffer, value: Long): Unit = + def writetUnsignedInt(buffer: ByteBuffer, value: Long): Unit = buffer.putInt((value & 0xffffffffL).asInstanceOf[Int]) - + /** * Write the given long value as a 4 byte unsigned integer. Overflow is ignored. * @param buffer The buffer to write to * @param index The position in the buffer at which to begin writing * @param value The value to write */ - def writeUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit = + def writeUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit = buffer.putInt(index, (value & 0xffffffffL).asInstanceOf[Int]) - + /** * Compute the CRC32 of the byte array * @param bytes The array to compute the checksum for * @return The CRC32 */ def crc32(bytes: Array[Byte]): Long = crc32(bytes, 0, bytes.length) - + /** * Compute the CRC32 of the segment of the byte array given by the specificed size and offset * @param bytes The bytes to checksum @@ -339,7 +340,7 @@ object Utils extends Logging { crc.update(bytes, offset, size) crc.getValue() } - + /** * Compute the hash code for the given items */ @@ -356,7 +357,7 @@ object Utils extends Logging { } return h } - + /** * Group the given values by keys extracted with the given function */ @@ -368,12 +369,12 @@ object Utils extends Logging { case Some(l: List[V]) => m.put(k, v :: l) case None => m.put(k, List(v)) } - } + } m } - + /** - * Read some bytes into the provided buffer, and return the number of bytes read. If the + * Read some bytes into the provided buffer, and return the number of bytes read. If the * channel has been closed or we get -1 on the read for any reason, throw an EOFException */ def read(channel: ReadableByteChannel, buffer: ByteBuffer): Int = { @@ -381,8 +382,8 @@ object Utils extends Logging { case -1 => throw new EOFException("Received -1 when reading from channel, socket has likely been closed.") case n: Int => n } - } - + } + /** * Throw an exception if the given value is null, else return it. You can use this like: * val myValue = Utils.notNull(expressionThatShouldntBeNull) @@ -411,11 +412,11 @@ object Utils extends Logging { def parseCsvMap(str: String): Map[String, String] = { val map = new mutable.HashMap[String, String] if("".equals(str)) - return map + return map val keyVals = str.split("\\s*,\\s*").map(s => s.split("\\s*:\\s*")) keyVals.map(pair => (pair(0), pair(1))).toMap } - + /** * Parse a comma separated string into a sequence of strings. * Whitespace surrounding the comma will be removed. @@ -467,7 +468,7 @@ object Utils extends Logging { stream.close() } } - + /** * Get the absolute value of the given number. If the number is Int.MinValue return 0. * This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!). @@ -496,7 +497,7 @@ object Utils extends Logging { throw new KafkaStorageException("Failed to create file %s.".format(path)) f } - + /** * Turn a properties map into a string */ @@ -505,7 +506,7 @@ object Utils extends Logging { props.store(writer, "") writer.toString } - + /** * Read some properties with the given default values */ @@ -515,7 +516,7 @@ object Utils extends Logging { props.load(reader) props } - + /** * Read a big-endian integer from a byte array */ @@ -525,7 +526,7 @@ object Utils extends Logging { ((bytes(offset + 2) & 0xFF) << 8) | (bytes(offset + 3) & 0xFF) } - + /** * Execute the given function inside the lock */ @@ -564,7 +565,7 @@ object Utils extends Logging { */ case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' && c <= '\u009f')) => "\\u%04x".format(c: Int) case c => c - }.mkString + }.mkString } /** @@ -576,4 +577,18 @@ object Utils extends Logging { .filter{ case (k,l) => (l > 1) } .keys } + + /** + * returns a integer from localHost IP + */ + def ipToInt(host: String): Int = { + val inetAddress = + if (host == null || host.trim.isEmpty || host == "localhost") + InetAddress.getLocalHost() + else + InetAddress.getByName(host) + val addr = inetAddress.getHostAddress() + addr.filterNot(_ == '.').toInt + } + } diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 56e3e88..c14bd45 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -46,6 +46,7 @@ object ZkUtils extends Logging { val ReassignPartitionsPath = "/admin/reassign_partitions" val DeleteTopicsPath = "/admin/delete_topics" val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" + val BrokerSequenceIdPath = "/brokers/seqid" def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic @@ -87,7 +88,8 @@ object ZkUtils extends Logging { } def setupCommonPaths(zkClient: ZkClient) { - for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, DeleteTopicsPath)) + for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, + DeleteTopicsPath, BrokerSequenceIdPath)) makeSurePersistentPathExists(zkClient, path) } @@ -122,6 +124,14 @@ object ZkUtils extends Logging { } } + /** returns a sequence id generated by updating BrokerSequenceIdPath in Zk. + * users can provide brokerId in the config , inorder to avoid conflicts between zk generated + * seqId and config.brokerId we increment zk seqId by KafkaConfig.MaxReservedBrokerId. + */ + def getBrokerSequenceId(zkClient: ZkClient, MaxReservedBrokerId: Int): Int = { + getSequenceId(zkClient, BrokerSequenceIdPath) + MaxReservedBrokerId + } + /** * Gets the in-sync replicas (ISR) for a specific topic and partition */ @@ -696,6 +706,32 @@ object ZkUtils extends Logging { } } + /** + * This API produces a sequence number by creating / updating given path in zookeeper + * It uses the stat returned by the zookeeper and return the version. Every time + * client updates the path stat.version gets incremented + */ + def getSequenceId(client: ZkClient, path: String): Int = { + try { + val stat = client.writeDataReturnStat(path, "", -1) + return stat.getVersion + } catch { + case e: ZkNoNodeException => { + createParentPath(client, BrokerSequenceIdPath) + try { + client.createPersistent(BrokerSequenceIdPath, "") + return 0 + } catch { + case e: ZkNodeExistsException => + val stat = client.writeDataReturnStat(BrokerSequenceIdPath, "", -1) + return stat.getVersion + case e2: Throwable => throw e2 + } + } + case e2: Throwable => throw e2 + } + } + def getAllTopics(zkClient: ZkClient): Seq[String] = { val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath) if(topics == null) diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala new file mode 100644 index 0000000..d41db46 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.zk.ZooKeeperTestHarness +import kafka.utils.{IntEncoder, TestUtils, Utils, VerifiableProperties} +import kafka.utils.TestUtils._ +import java.io.File +import org.junit.Test +import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ + +class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness { + var props1 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort) + var config1 = new KafkaConfig(props1) + var props2 = TestUtils.createBrokerConfig(0, TestUtils.choosePort) + var config2 = new KafkaConfig(props2) + + + @Test + def testAutoGenerateBrokerId() { + var server1 = new KafkaServer(config1) + server1.startup() + // do a clean shutdown and check that offset checkpoint file exists + server1.shutdown() + for(logDir <- config1.logDirs) { + val metaProps = new VerifiableProperties(Utils.loadProps(logDir+"/meta.properties")) + assertTrue(metaProps.containsKey("broker.id")) + assertEquals(metaProps.getInt("broker.id"),1001) + } + // restart the server check to see if it uses the brokerId generated previously + server1 = new KafkaServer(config1) + server1.startup() + assertEquals(server1.config.brokerId, 1001) + server1.shutdown() + Utils.rm(server1.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus + } + + @Test + def testAutoGenerateBrokerIdWithIPPolicy() { + val props3 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort) + props3.put("broker.id.policy","ip") + val config3 = new KafkaConfig(props3) + var server1 = new KafkaServer(config3) + server1.startup() + // do a clean shutdown and check that offset checkpoint file exists + server1.shutdown() + for(logDir <- config3.logDirs) { + val metaProps = new VerifiableProperties(Utils.loadProps(logDir + File.separator + "meta.properties")) + assertTrue(metaProps.containsKey("broker.id")) + assertEquals(metaProps.getInt("broker.id"),Utils.ipToInt(config3.hostName)) + } + // restart the server check to see if it uses the brokerId generated previously + server1 = new KafkaServer(config3) + server1.startup() + assertEquals(server1.config.brokerId, Utils.ipToInt(config3.hostName)) + server1.shutdown() + Utils.rm(server1.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus + } + + @Test + def testUserConfigAndGenratedBrokerId() { + // start the server with broker.id as part of config + val server1 = new KafkaServer(config1) + val server2 = new KafkaServer(config2) + val props3 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort) + val config3 = new KafkaConfig(props3) + val server3 = new KafkaServer(config3) + server1.startup() + assertEquals(server1.config.brokerId,1001) + server2.startup() + assertEquals(server2.config.brokerId,0) + server3.startup() + assertEquals(server3.config.brokerId,1002) + server1.shutdown() + server2.shutdown() + server3.shutdown() + Utils.rm(server1.config.logDirs) + Utils.rm(server2.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus + } + + @Test + def testMultipleLogDirsMetaProps() { + // add multiple logDirs and check if the generate brokerId is stored in all of them + var logDirs = props1.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath + + "," + TestUtils.tempDir().getAbsolutePath + props1.setProperty("log.dir",logDirs) + config1 = new KafkaConfig(props1) + var server1 = new KafkaServer(config1) + server1.startup() + server1.shutdown() + for(logDir <- config1.logDirs) { + val metaProps = new VerifiableProperties(Utils.loadProps(logDir+"/meta.properties")) + assertTrue(metaProps.containsKey("broker.id")) + assertEquals(metaProps.getInt("broker.id"),1001) + } + + // addition to log.dirs after generation of a broker.id from zk should be copied over + var newLogDirs = props1.getProperty("log.dir") + "," + TestUtils.tempDir().getAbsolutePath + props1.setProperty("log.dir",newLogDirs) + config1 = new KafkaConfig(props1) + server1 = new KafkaServer(config1) + server1.startup() + server1.shutdown() + for(logDir <- config1.logDirs) { + val metaProps = new VerifiableProperties(Utils.loadProps(logDir+"/meta.properties")) + assertTrue(metaProps.containsKey("broker.id")) + assertEquals(metaProps.getInt("broker.id"),1001) + } + Utils.rm(server1.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus + } + + @Test + def testConsistentBrokerIdFromUserConfigAndMetaProps() { + // check if configured brokerId and stored brokerId are equal or throw InconsistentBrokerException + var server1 = new KafkaServer(config1) //auto generate broker Id + server1.startup() + server1.shutdown() + server1 = new KafkaServer(config2) // user specified broker id + try { + server1.startup() + } catch { + case e: kafka.common.InconsistentBrokerIdException => //success + } + server1.shutdown() + Utils.rm(server1.config.logDirs) + TestUtils.verifyNonDaemonThreadsStatus + } + +} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 0da774d..9b286f5 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -94,7 +94,7 @@ object TestUtils extends Logging { Utils.rm(f) } }) - + f } @@ -154,7 +154,7 @@ object TestUtils extends Logging { def createBrokerConfig(nodeId: Int, port: Int = choosePort(), enableControlledShutdown: Boolean = true): Properties = { val props = new Properties - props.put("broker.id", nodeId.toString) + if (nodeId >= 0) props.put("broker.id", nodeId.toString) props.put("host.name", "localhost") props.put("port", port.toString) props.put("log.dir", TestUtils.tempDir().getAbsolutePath) @@ -698,6 +698,11 @@ object TestUtils extends Logging { ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath) } + def verifyNonDaemonThreadsStatus() { + assertEquals(0, Thread.getAllStackTraces.keySet().toArray + .map(_.asInstanceOf[Thread]) + .count(t => !t.isDaemon && t.isAlive && t.getClass.getCanonicalName.toLowerCase.startsWith("kafka"))) + } /** * Create new LogManager instance with default configuration for testing -- 1.9.3 (Apple Git-50)