From 7b4fdbcddd8f0bea63475ce2533a6dd317302fac Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Wed, 20 May 2015 18:12:10 -0700 Subject: [PATCH 1/5] KAFKA-2205: First commit --- core/src/main/scala/kafka/admin/AdminUtils.scala | 56 ++++++-- .../src/main/scala/kafka/admin/ConfigCommand.scala | 156 +++++++++++++++++++++ core/src/main/scala/kafka/admin/TopicCommand.scala | 1 + .../kafka/controller/TopicDeletionManager.scala | 5 +- .../main/scala/kafka/server/ConfigHandler.scala | 56 ++++++++ .../scala/kafka/server/DynamicConfigManager.scala | 138 ++++++++++++++++++ core/src/main/scala/kafka/server/KafkaServer.scala | 20 ++- .../scala/kafka/server/TopicConfigManager.scala | 15 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 22 ++- .../test/scala/unit/kafka/admin/AdminTest.scala | 8 +- .../scala/unit/kafka/admin/ConfigCommandTest.scala | 73 ++++++++++ .../scala/unit/kafka/admin/TopicCommandTest.scala | 2 +- .../kafka/server/DynamicConfigChangeTest.scala | 78 ++++++++++- 13 files changed, 592 insertions(+), 38 deletions(-) create mode 100644 core/src/main/scala/kafka/admin/ConfigCommand.scala create mode 100644 core/src/main/scala/kafka/server/ConfigHandler.scala create mode 100644 core/src/main/scala/kafka/server/DynamicConfigManager.scala create mode 100644 core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index f06edf4..d0dac2b 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -21,6 +21,7 @@ import kafka.common._ import kafka.cluster.{BrokerEndPoint, Broker} import kafka.log.LogConfig +import kafka.server.ConfigType import kafka.utils._ import kafka.api.{TopicMetadata, PartitionMetadata} @@ -40,10 +41,8 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException object AdminUtils extends Logging { val rand = new Random - val AdminClientId = "__admin_client" - - val TopicConfigChangeZnodePrefix = "config_change_" + val EntityConfigChangeZnodePrefix = "config_change_" /** * There are 2 goals of replica assignment: @@ -249,7 +248,7 @@ object AdminUtils extends Logging { partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment)) // write out the config if there is any, this isn't transactional with the partition assignments - writeTopicConfig(zkClient, topic, config) + writeEntityConfig(zkClient, ConfigType.Topics, topic, config) // create the partition assignment writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update) @@ -273,7 +272,19 @@ object AdminUtils extends Logging { case e2: Throwable => throw new AdminOperationException(e2.toString) } } - + + /** + * Update the config for a client and create a change notification so the change will propagate to other brokers + * @param zkClient: The ZkClient handle used to write the new config to zookeeper + * @param clientId: The clientId for which configs are being changed + * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or + * existing configs need to be deleted, it should be done prior to invoking this API + * + */ + def changeClientIdConfig(zkClient: ZkClient, clientId: String, configs: Properties) { + changeEntityConfig(zkClient, ConfigType.Clients, clientId, configs) + } + /** * Update the config for an existing topic and create a change notification so the change will propagate to other brokers * @param zkClient: The ZkClient handle used to write the new config to zookeeper @@ -285,34 +296,49 @@ object AdminUtils extends Logging { def changeTopicConfig(zkClient: ZkClient, topic: String, configs: Properties) { if(!topicExists(zkClient, topic)) throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic)) - // remove the topic overrides LogConfig.validate(configs) + changeEntityConfig(zkClient, ConfigType.Topics, topic, configs) + } + private def changeEntityConfig(zkClient: ZkClient, entityType: String, entityName: String, configs: Properties) { // write the new config--may not exist if there were previously no overrides - writeTopicConfig(zkClient, topic, configs) - + writeEntityConfig(zkClient, entityType, entityName, configs) + // create the change notification - zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic)) + val seqNode = ZkUtils.EntityConfigChangesPath + "/" + EntityConfigChangeZnodePrefix + val content = Json.encode(getConfigChangeZnodeData(entityType, entityName)) + zkClient.createPersistentSequential(seqNode, content) } - + + def getConfigChangeZnodeData(entityType: String, entityName: String) : Map[String, Any] = { + Map("version" -> 1, "entityType" -> entityType, "entityName" -> entityName) + } + /** * Write out the topic config to zk, if there is any */ - private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) { + private def writeEntityConfig(zkClient: ZkClient, entityType: String, entityName: String, config: Properties) { val configMap: mutable.Map[String, String] = { import JavaConversions._ config } val map = Map("version" -> 1, "config" -> configMap) - ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map)) + ZkUtils.updatePersistentPath(zkClient, ZkUtils.getEntityConfigPath(entityType, entityName), Json.encode(map)) } /** * Read the topic config (if any) from zk */ def fetchTopicConfig(zkClient: ZkClient, topic: String): Properties = { - val str: String = zkClient.readData(ZkUtils.getTopicConfigPath(topic), true) + fetchEntityConfig(zkClient, ConfigType.Topics, topic) + } + + /** + * Read the entity (topic or client) config (if any) from zk + */ + def fetchEntityConfig(zkClient: ZkClient, entityType: String, entity: String): Properties = { + val str: String = zkClient.readData(ZkUtils.getEntityConfigPath(entityType, entity), true) val props = new Properties() if(str != null) { Json.parseFull(str) match { @@ -326,9 +352,9 @@ object AdminUtils extends Logging { configTup match { case (k: String, v: String) => props.setProperty(k, v) - case _ => throw new IllegalArgumentException("Invalid topic config: " + str) + case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str) } - case _ => throw new IllegalArgumentException("Invalid topic config: " + str) + case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str) } case o => throw new IllegalArgumentException("Unexpected value in config: " + str) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala new file mode 100644 index 0000000..9d00363 --- /dev/null +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import joptsimple._ +import java.util.Properties +import kafka.server.ConfigType +import kafka.utils.{ZKStringSerializer, CommandLineUtils} +import org.I0Itec.zkclient.ZkClient +import scala.collection._ +import scala.collection.JavaConversions._ +import org.apache.kafka.common.utils.Utils + + +object ConfigCommand { + + def main(args: Array[String]): Unit = { + + val opts = new ConfigCommandOptions(args) + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity (topic/client) configs") + + // should have exactly one action + val actions = Seq(opts.alterOpt, opts.describeOpt).count(opts.options.has _) + if(actions != 1) + CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --describe, --alter") + + opts.checkArgs() + + val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) + + try { + if(opts.options.has(opts.alterOpt)) + alterConfig(zkClient, opts) + else if(opts.options.has(opts.describeOpt)) 1 + describeConfig(zkClient, opts) + } catch { + case e: Throwable => + println("Error while executing topic command " + e.getMessage) + println(Utils.stackTrace(e)) + } finally { + zkClient.close() + } + } + + def alterConfig(zkClient: ZkClient, opts: ConfigCommandOptions) { + val configsToBeAdded = parseConfigsToBeAdded(opts) + val configsToBeDeleted = parseConfigsToBeDeleted(opts) + val entityType = opts.options.valueOf(opts.entityType) + val entityName = opts.options.valueOf(opts.entityName) + + // compile the final set of configs + val configs = AdminUtils.fetchEntityConfig(zkClient, entityType, entityName) + configs.putAll(configsToBeAdded) + configsToBeDeleted.foreach(config => configs.remove(config)) + + if(entityType.equals(ConfigType.Topics)) { + AdminUtils.changeTopicConfig(zkClient, entityName, configs) + println("Updated config for topic: \"%s\".".format(entityName)) + } else { + AdminUtils.changeClientIdConfig(zkClient, entityName, configs) + println("Updated config for clientId: \"%s\".".format(entityName)) + } + } + + def describeConfig(zkClient: ZkClient, opts: ConfigCommandOptions) { + val entityType = opts.options.valueOf(opts.entityType) + val entityName = opts.options.valueOf(opts.entityName) + val configs = AdminUtils.fetchEntityConfig(zkClient, entityType, entityName) + println("Configs for %s:%s are %s" + .format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(","))) + } + + def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = { + val configsToBeAdded = opts.options.valuesOf(opts.addedConfig).map(_.split("""\s*=\s*""")) + require(configsToBeAdded.forall(config => config.length == 2), + "Invalid entity config: all configs to be added must be in the format \"key=val\".") + val props = new Properties + configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim)) + props + } + + def parseConfigsToBeDeleted(opts: ConfigCommandOptions): Seq[String] = { + if (opts.options.has(opts.deletedConfig)) { + val configsToBeDeleted = opts.options.valuesOf(opts.deletedConfig).map(_.trim()) + val propsToBeDeleted = new Properties + configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, "")) + configsToBeDeleted + } + else + Seq.empty + } + + class ConfigCommandOptions(args: Array[String]) { + val parser = new OptionParser + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.") + val describeOpt = parser.accepts("describe", "List configs for the given entity.") + val entityType = parser.accepts("entityType", "Type of entity (topic/client)") + .withRequiredArg + .ofType(classOf[String]) + val entityName = parser.accepts("entityName", "Name of entity (topic name/client id)") + .withRequiredArg + .ofType(classOf[String]) + val addedConfig = parser.accepts("added-config", "Key Value pairs configs to add 'k1=v1,k2=v2'") + .withRequiredArg + .ofType(classOf[String]) + .withValuesSeparatedBy(',') + val deletedConfig = parser.accepts("deleted-config", "config keys to remove 'k1,k2'") + .withRequiredArg + .ofType(classOf[String]) + .withValuesSeparatedBy(',') + val helpOpt = parser.accepts("help", "Print usage information.") + val options = parser.parse(args : _*) + + val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, entityName, addedConfig, deletedConfig, helpOpt) + + def checkArgs() { + // check required args + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType, entityName) + CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt)) + if(options.has(alterOpt)) { + val isAddedPresent: Boolean = options.has(addedConfig) + val isDeletedPresent: Boolean = options.has(deletedConfig) + if(! isAddedPresent && ! isDeletedPresent) + throw new IllegalArgumentException("At least one of --added-config or --deleted-config must be specified with --alter") + } + val entityTypeVal = options.valueOf(entityType) + if(! entityTypeVal.equals(ConfigType.Topics) && ! entityTypeVal.equals(ConfigType.Clients)) { + throw new IllegalArgumentException("entityType must be 'client' or 'topic'") + } + } + } + +} diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 8e6f186..8bcfe03 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -20,6 +20,7 @@ package kafka.admin import joptsimple._ import java.util.Properties import kafka.common.{Topic, AdminCommandFailedException} +import kafka.utils.CommandLineUtils import kafka.utils._ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index 64ecb49..bff9d2f 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -16,6 +16,9 @@ */ package kafka.controller + +import kafka.server.ConfigType + import collection.mutable import kafka.utils.{ShutdownableThread, Logging, ZkUtils} import kafka.utils.CoreUtils._ @@ -284,7 +287,7 @@ class TopicDeletionManager(controller: KafkaController, topicsToBeDeleted -= topic partitionsToBeDeleted.retain(_.topic != topic) controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) - controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic)) + controllerContext.zkClient.deleteRecursive(ZkUtils.getEntityConfigPath(ConfigType.Topics, topic)) controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic)) controllerContext.removeTopic(topic) } diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala new file mode 100644 index 0000000..e452515 --- /dev/null +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -0,0 +1,56 @@ +package kafka.server + + +import java.util.Properties + +import kafka.log.{LogConfig, LogManager} +import org.apache.kafka.common.metrics.Metrics + + +/** + * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager + */ +trait ConfigHandler { + def processConfigChanges(entityName : String, value : Properties) +} + +/** + * The TopicConfigHandler will process topic config changes in ZK. + * The callback provides the topic name and the full properties set read from ZK + */ +class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler{ + + def processConfigChanges(topic : String, topicConfig : Properties) { + System.out.println("Received change " + topic + ", config " + topicConfig) + + val logs = logManager.logsByTopicPartition.toBuffer + val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) + + if (logsByTopic.contains(topic)) + { + System.out.println("Chaning log config") + + /* combine the default properties with the overrides in zk to create the new LogConfig */ + val props = new Properties(logManager.defaultConfig.toProps) + props.putAll(topicConfig) + val logConfig = LogConfig.fromProps(props) + for (log <- logsByTopic(topic)) + log.config = logConfig + } + } +} + +/** + * The ClientIdConfigHandler will process clientId config changes in ZK. + * The callback provides the clientId and the full properties set read from ZK. + * This implementation does nothing currently. In the future, it will change quotas per client + */ +class ClientIdConfigHandler extends ConfigHandler { + @volatile var clientId: String = null + @volatile var clientConfig: Properties = null + + def processConfigChanges(clientId : String, clientConfig : Properties): Unit = { + this.clientId = clientId + this.clientConfig = clientConfig + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala new file mode 100644 index 0000000..93080a9 --- /dev/null +++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + + +import scala.collection._ +import kafka.utils._ +import kafka.admin.AdminUtils +import org.I0Itec.zkclient.{IZkChildListener, ZkClient} + +object ConfigType { + val Topics = "topics" + val Clients = "clients" +} + +class DynamicConfigManager(private val zkClient: ZkClient, + private val configHandler : Map[String, ConfigHandler], + private val changeExpirationMs: Long = 15*60*1000, + private val time: Time = SystemTime) extends Logging { + private var lastExecutedChange = -1L + + /** + * Begin watching for config changes + */ + def startup() { + ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.EntityConfigChangesPath) + zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener) + processAllConfigChanges() + } + + /** + * Process all config changes + */ + private def processAllConfigChanges() { + val configChanges = zkClient.getChildren(ZkUtils.EntityConfigChangesPath) + import JavaConversions._ + processConfigChanges((configChanges: mutable.Buffer[String]).sorted) + } + + /** + * Process the given list of config changes + */ + private def processConfigChanges(notifications: Seq[String]) { + if (notifications.size > 0) { + info("Processing config change notification(s)...") + val now = time.milliseconds + for (notification <- notifications) { + val changeId = changeNumber(notification) + + if (changeId > lastExecutedChange) { + val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification + + val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode) + processNotification(jsonOpt) + } + lastExecutedChange = changeId + } + purgeObsoleteNotifications(now, notifications) + } + } + + def processNotification(jsonOpt: Option[String]) = { + if(jsonOpt.isDefined) { + val json = jsonOpt.get + Json.parseFull(json) match { + case None => // there are no config overrides + case Some(mapAnon: Map[_, _]) => + val map = mapAnon collect + { case (k: String, v: Any) => k -> v } + require(map("version") == 1) + + val entityType = map.get("entityType") match { + case Some(ConfigType.Topics) => ConfigType.Topics + case Some(ConfigType.Clients) => ConfigType.Clients + case _ => throw new IllegalArgumentException("Invalid entityType config. Must be either 'client' or 'topic'") + } + + val entity = map.get("entityName") match { + case Some(value: String) => value + case _ => throw new IllegalArgumentException("Value not specified in config change notification " + json) + } + if (configHandler.contains(entityType)) + configHandler(entityType).processConfigChanges(entity, AdminUtils.fetchEntityConfig(zkClient, entityType, entity)) + else + warn("Cannot process config changes for entity " + entity) + + case o => throw new IllegalArgumentException("Unexpected value in config: " + o) + } + } + } + + private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) { + for(notification <- notifications.sorted) { + val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.EntityConfigChangesPath + "/" + notification) + if(jsonOpt.isDefined) { + val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notification + if (now - stat.getCtime > changeExpirationMs) { + debug("Purging config change notification " + notification) + ZkUtils.deletePath(zkClient, changeZnode) + } else { + return + } + } + } + } + + /* get the change number from a change notification znode */ + private def changeNumber(name: String): Long = name.substring(AdminUtils.EntityConfigChangeZnodePrefix.length).toLong + + /** + * A listener that applies config changes to logs + */ + object ConfigChangeListener extends IZkChildListener { + override def handleChildChange(path: String, chillins: java.util.List[String]) { + try { + import JavaConversions._ + processConfigChanges(chillins: mutable.Buffer[String]) + } catch { + case e: Exception => error("Error processing config change:", e) + } + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index ea6d165..caa38b6 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -26,7 +26,9 @@ import java.util.concurrent._ import atomic.{AtomicInteger, AtomicBoolean} import java.io.File -import collection.mutable +import org.apache.kafka.common.metrics.{JmxReporter, Metrics} + +import scala.collection.{JavaConversions, mutable} import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.{EndPoint, Broker} @@ -60,7 +62,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var replicaManager: ReplicaManager = null - var topicConfigManager: TopicConfigManager = null + var dynamicConfigHandlers: Map[String, ConfigHandler] = null + var dynamicConfigManager: DynamicConfigManager = null + val metrics: Metrics = new Metrics() var consumerCoordinator: ConsumerCoordinator = null @@ -152,9 +156,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Mx4jLoader.maybeLoad() - /* start topic config manager */ - topicConfigManager = new TopicConfigManager(zkClient, logManager) - topicConfigManager.startup() + /* start dynamic config manager */ + dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topics -> new TopicConfigHandler(logManager), + ConfigType.Clients -> new ClientIdConfigHandler) + dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers) + dynamicConfigManager.startup() /* tell everyone we are alive */ val listeners = config.advertisedListeners.map {case(protocol, endpoint) => @@ -184,6 +190,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg } } + private def initMetrics() : Metrics = { + new Metrics() + } + private def initZk(): ZkClient = { info("Connecting to zookeeper on " + config.zkConnect) diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala index b675a7e..527cb7c 100644 --- a/core/src/main/scala/kafka/server/TopicConfigManager.scala +++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala @@ -68,8 +68,8 @@ class TopicConfigManager(private val zkClient: ZkClient, * Begin watching for config changes */ def startup() { - ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.TopicConfigChangesPath) - zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener) + ZkUtils.makeSurePersistentPathExists(zkClient, null)//ZkUtils.TopicConfigChangesPath) + //zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener) processAllConfigChanges() } @@ -77,7 +77,8 @@ class TopicConfigManager(private val zkClient: ZkClient, * Process all config changes */ private def processAllConfigChanges() { - val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath) + //val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath) + val configChanges = zkClient.getChildren(null) import JavaConversions._ processConfigChanges((configChanges: mutable.Buffer[String]).sorted) } @@ -94,7 +95,7 @@ class TopicConfigManager(private val zkClient: ZkClient, for (notification <- notifications) { val changeId = changeNumber(notification) if (changeId > lastExecutedChange) { - val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification + val changeZnode = null //ZkUtils.ATopicConfigChangesPath //+ "/" //+ notification val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode) if(jsonOpt.isDefined) { val json = jsonOpt.get @@ -118,9 +119,9 @@ class TopicConfigManager(private val zkClient: ZkClient, private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) { for(notification <- notifications.sorted) { - val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.TopicConfigChangesPath + "/" + notification) + val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, null) //ZkUtils.TopicConfigChangesPath + "/" + notification) if(jsonOpt.isDefined) { - val changeZnode = ZkUtils.TopicConfigChangesPath + "/" + notification + val changeZnode = null //ZkUtils.TopicConfigChangesPath + "/" + notification if (now - stat.getCtime > changeExpirationMs) { debug("Purging config change notification " + notification) ZkUtils.deletePath(zkClient, changeZnode) @@ -132,7 +133,7 @@ class TopicConfigManager(private val zkClient: ZkClient, } /* get the change number from a change notification znode */ - private def changeNumber(name: String): Long = name.substring(AdminUtils.TopicConfigChangeZnodePrefix.length).toLong + private def changeNumber(name: String): Long = 1 //name.substring(AdminUtils.TopicConfigChangeZnodePrefix.length).toLong /** * A listener that applies config changes to logs diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 2618dd3..976ff63 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -19,6 +19,7 @@ package kafka.utils import kafka.cluster._ import kafka.consumer.{ConsumerThreadId, TopicCount} +import kafka.server.ConfigType import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError, ZkBadVersionException} @@ -39,14 +40,14 @@ object ZkUtils extends Logging { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" - val TopicConfigPath = "/config/topics" - val TopicConfigChangesPath = "/config/changes" val ControllerPath = "/controller" val ControllerEpochPath = "/controller_epoch" val ReassignPartitionsPath = "/admin/reassign_partitions" val DeleteTopicsPath = "/admin/delete_topics" val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" val BrokerSequenceIdPath = "/brokers/seqid" + val EntityConfigPath = "/config" + val EntityConfigChangesPath = "/config/changes" def getTopicPath(topic: String): String = { BrokerTopicsPath + "/" + topic @@ -56,8 +57,11 @@ object ZkUtils extends Logging { getTopicPath(topic) + "/partitions" } - def getTopicConfigPath(topic: String): String = - TopicConfigPath + "/" + topic + def getEntityConfigRootPath(entityType: String): String = + EntityConfigPath + "/" + entityType + + def getEntityConfigPath(entityType: String, entity: String): String = + getEntityConfigRootPath(entityType) + "/" + entity def getDeleteTopicPath(topic: String): String = DeleteTopicsPath + "/" + topic @@ -92,8 +96,14 @@ object ZkUtils extends Logging { } def setupCommonPaths(zkClient: ZkClient) { - for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, - DeleteTopicsPath, BrokerSequenceIdPath)) + for(path <- Seq(ConsumersPath, + BrokerIdsPath, + BrokerTopicsPath, + EntityConfigChangesPath, + ZkUtils.getEntityConfigRootPath(ConfigType.Topics), + ZkUtils.getEntityConfigRootPath(ConfigType.Clients), + DeleteTopicsPath, + BrokerSequenceIdPath)) makeSurePersistentPathExists(zkClient, path) } diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index efb2f8e..f5ba8de 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -25,7 +25,7 @@ import kafka.log._ import kafka.zk.ZooKeeperTestHarness import kafka.utils.{Logging, ZkUtils, TestUtils} import kafka.common.{TopicExistsException, TopicAndPartition} -import kafka.server.{KafkaServer, KafkaConfig} +import kafka.server.{ConfigType, KafkaServer, KafkaConfig} import java.io.File import TestUtils._ @@ -393,12 +393,16 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { checkConfig(maxMessageSize, retentionMs) // now double the config values for the topic and check that it is applied + val newConfig: Properties = makeConfig(2*maxMessageSize, 2 * retentionMs) AdminUtils.changeTopicConfig(server.zkClient, topic, makeConfig(2*maxMessageSize, 2 * retentionMs)) checkConfig(2*maxMessageSize, 2 * retentionMs) + + // Verify that the same config can be read from ZK + val configInZk = AdminUtils.fetchEntityConfig(server.zkClient, ConfigType.Topics, topic) + assertEquals(newConfig, configInZk) } finally { server.shutdown() server.config.logDirs.foreach(CoreUtils.rm(_)) } } - } diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala new file mode 100644 index 0000000..11cb35b --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import junit.framework.Assert._ +import kafka.admin.ConfigCommand.ConfigCommandOptions +import org.junit.Test +import org.scalatest.junit.JUnit3Suite +import kafka.utils.Logging +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import kafka.server.{ConfigType, OffsetManager, KafkaConfig} +import kafka.admin.TopicCommand.TopicCommandOptions +import kafka.utils.ZkUtils + +class ConfigCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { + @Test + def testArgumentParse() { + // Should parse correctly + var createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, + "--entityName", "x", + "--entityType", "clients", + "--describe")) + createOpts.checkArgs() + + // For --alter and added config + createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, + "--entityName", "x", + "--entityType", "clients", + "--alter", + "--added-config", "a=b,c=d")) + createOpts.checkArgs() + + // For alter and deleted config + createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, + "--entityName", "x", + "--entityType", "clients", + "--alter", + "--deleted-config", "a,b,c")) + createOpts.checkArgs() + + // For alter and both added, deleted config + createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, + "--entityName", "x", + "--entityType", "clients", + "--alter", + "--added-config", "a=b,c=d", + "--deleted-config", "a")) + createOpts.checkArgs() + val addedProps = ConfigCommand.parseConfigsToBeAdded(createOpts) + assertEquals(2, addedProps.size()) + assertEquals("b", addedProps.getProperty("a")) + assertEquals("d", addedProps.getProperty("c")) + + val deletedProps = ConfigCommand.parseConfigsToBeDeleted(createOpts) + assertEquals(1, deletedProps.size) + assertEquals("a", deletedProps(0)) + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index c7136f2..7d14c6d 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -48,7 +48,7 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal)) // pre-create the topic config changes path to avoid a NoNodeException - ZkUtils.createPersistentPath(zkClient, ZkUtils.TopicConfigChangesPath) + ZkUtils.createPersistentPath(zkClient, ZkUtils.EntityConfigChangesPath) // modify the topic to add new partitions val numPartitionsModified = 3 diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 7877f6c..db2dc24 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -16,7 +16,12 @@ */ package kafka.server + +import java.util.Properties + import junit.framework.Assert._ +import org.I0Itec.zkclient.ZkClient +import org.easymock.{Capture, EasyMock} import org.junit.Test import kafka.integration.KafkaServerTestHarness import kafka.utils._ @@ -29,7 +34,9 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) @Test - def testConfigChange() { + def testTopicConfigChange() { + assertTrue("Should contain a ConfigHandler for topics", + this.servers(0).dynamicConfigHandlers.contains(ConfigType.Topics)) val oldVal = 100000 val newVal = 200000 val tp = TopicAndPartition("test", 0) @@ -45,6 +52,24 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { } } + // For now client config changes do not do anything. Simply verify that the call was made + @Test + def testClientConfigChange() { + assertTrue("Should contain a ConfigHandler for topics", + this.servers(0).dynamicConfigHandlers.contains(ConfigType.Clients)) + val clientId = "testClient" + val props = new Properties() + props.put("a.b", "c") + props.put("x.y", "z") + AdminUtils.changeClientIdConfig(zkClient, clientId, props) + TestUtils.retry(10000) { + val configHandler = this.servers(0).dynamicConfigHandlers(ConfigType.Clients).asInstanceOf[ClientIdConfigHandler] + assertEquals("ClientId should be set to testClient", clientId, configHandler.clientId) + assertEquals("c", configHandler.clientConfig.getProperty("a.b")) + assertEquals("z", configHandler.clientConfig.getProperty("x.y")) + } + } + @Test def testConfigChangeOnNonExistingTopic() { val topic = TestUtils.tempTopic @@ -56,4 +81,55 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { } } + @Test + def testProcessNotification { + val props = new Properties() + props.put("a.b", "10") + // create nice mock since we don't particularly care about zkclient calls + val entityArgument = new Capture[String]() + val propertiesArgument = new Capture[Properties]() + val handler = EasyMock.createNiceMock(classOf[ConfigHandler]) + handler.processConfigChanges( + EasyMock.and(EasyMock.capture(entityArgument), EasyMock.isA(classOf[String])), + EasyMock.and(EasyMock.capture(propertiesArgument), EasyMock.isA(classOf[Properties]))) + EasyMock.expectLastCall() + EasyMock.replay(handler) + + val configManager = new DynamicConfigManager(zkClient, Map("Topic" -> handler)) + // Is ignored + configManager.processNotification(Some("not json")) + + // Incorrect Map. No version + try { + val jsonMap = Map("v" -> 1, "x" -> 2) + configManager.processNotification(Some(Json.encode(jsonMap))) + fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) + } + catch { + case t: Throwable => + } + // Version is provided. EntityType is incorrect + try { + val jsonMap = Map("version" -> 1, "entityType" -> "garbage", "entityName" -> "x") + configManager.processNotification(Some(Json.encode(jsonMap))) + fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) + } + catch { + case t: Throwable => + } + + // EntityName isn't provided + try { + val jsonMap = Map("version" -> 1, "entityType" -> ConfigType.Clients) + configManager.processNotification(Some(Json.encode(jsonMap))) + fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) + } + catch { + case t: Throwable => + } + + // Everything is provided + val jsonMap = Map("version" -> 1, "entityType" -> ConfigType.Clients, "entityName" -> "x") + configManager.processNotification(Some(Json.encode(jsonMap))) + } } \ No newline at end of file -- 1.7.12.4 From 2a552030e66423e9016b6efe1a30949259ce4097 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Thu, 21 May 2015 09:32:09 -0700 Subject: [PATCH 2/5] Some fixes --- core/src/main/scala/kafka/admin/AdminUtils.scala | 9 +-------- core/src/main/scala/kafka/admin/ConfigCommand.scala | 3 +++ core/src/main/scala/kafka/admin/TopicCommand.scala | 6 +++--- core/src/main/scala/kafka/cluster/Partition.scala | 5 +++-- core/src/main/scala/kafka/controller/KafkaController.scala | 4 ++-- .../main/scala/kafka/controller/PartitionLeaderSelector.scala | 6 +++--- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | 4 ++-- core/src/main/scala/kafka/server/TopicConfigManager.scala | 2 +- core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala | 6 +++--- 9 files changed, 21 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index d0dac2b..4ddff75 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -328,13 +328,6 @@ object AdminUtils extends Logging { } /** - * Read the topic config (if any) from zk - */ - def fetchTopicConfig(zkClient: ZkClient, topic: String): Properties = { - fetchEntityConfig(zkClient, ConfigType.Topics, topic) - } - - /** * Read the entity (topic or client) config (if any) from zk */ def fetchEntityConfig(zkClient: ZkClient, entityType: String, entity: String): Properties = { @@ -364,7 +357,7 @@ object AdminUtils extends Logging { } def fetchAllTopicConfigs(zkClient: ZkClient): Map[String, Properties] = - ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchTopicConfig(zkClient, topic))).toMap + ZkUtils.getAllTopics(zkClient).map(topic => (topic, fetchEntityConfig(zkClient, ConfigType.Topics, topic))).toMap def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata = fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker]) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 9d00363..76a445d 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -27,6 +27,9 @@ import scala.collection.JavaConversions._ import org.apache.kafka.common.utils.Utils +/** + * This script can be used to change configs for topics/clients dynamically + */ object ConfigCommand { def main(args: Array[String]): Unit = { diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 8bcfe03..2a2271c 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -28,7 +28,7 @@ import scala.collection._ import scala.collection.JavaConversions._ import kafka.log.LogConfig import kafka.consumer.Whitelist -import kafka.server.OffsetManager +import kafka.server.{ConfigType, OffsetManager} import org.apache.kafka.common.utils.Utils @@ -101,7 +101,7 @@ object TopicCommand { println("Topic %s does not exist".format(opts.options.valueOf(opts.topicOpt))) } topics.foreach { topic => - val configs = AdminUtils.fetchTopicConfig(zkClient, topic) + val configs = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topics, topic) if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { val configsToBeAdded = parseTopicConfigsToBeAdded(opts) val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts) @@ -174,7 +174,7 @@ object TopicCommand { val describePartitions: Boolean = !reportOverriddenConfigs val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1) if (describeConfigs) { - val configs = AdminUtils.fetchTopicConfig(zkClient, topic) + val configs = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topics, topic) if (!reportOverriddenConfigs || configs.size() != 0) { val numPartitions = topicPartitionAssignment.size val replicationFactor = topicPartitionAssignment.head._2.size diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 730a232..b989ada 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,7 +22,7 @@ import kafka.utils.CoreUtils.{inReadLock,inWriteLock} import kafka.admin.AdminUtils import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig -import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, OffsetManager, LogReadResult, ReplicaManager} +import kafka.server._ import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet @@ -86,7 +86,8 @@ class Partition(val topic: String, case Some(replica) => replica case None => if (isReplicaLocal(replicaId)) { - val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient, topic)) + val config = LogConfig.fromProps(logManager.defaultConfig.toProps, + AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topics, topic)) val log = logManager.createLog(TopicAndPartition(topic, partitionId), config) val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) val offsetMap = checkpoint.read diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 69bba24..e143bfb 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1013,8 +1013,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt // if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election // is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can // eventually be restored as the leader. - if (newIsr.isEmpty && !LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(zkClient, - topicAndPartition.topic)).uncleanLeaderElectionEnable) { + if (newIsr.isEmpty && !LogConfig.fromProps(config.toProps, AdminUtils.fetchEntityConfig(zkClient, + ConfigType.Topics, topicAndPartition.topic)).uncleanLeaderElectionEnable) { info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition)) newIsr = leaderAndIsr.isr } diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 3b15ab4..89b4eeb 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -21,7 +21,7 @@ import kafka.api.LeaderAndIsr import kafka.log.LogConfig import kafka.utils.Logging import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException} -import kafka.server.KafkaConfig +import kafka.server.{ConfigType, KafkaConfig} trait PartitionLeaderSelector { @@ -61,8 +61,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi case true => // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration // for unclean leader election. - if (!LogConfig.fromProps(config.toProps, AdminUtils.fetchTopicConfig(controllerContext.zkClient, - topicAndPartition.topic)).uncleanLeaderElectionEnable) { + if (!LogConfig.fromProps(config.toProps, AdminUtils.fetchEntityConfig(controllerContext.zkClient, + ConfigType.Topics, topicAndPartition.topic)).uncleanLeaderElectionEnable) { throw new NoReplicaOnlineException(("No broker in ISR for partition " + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(","))) diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index b31b432..118773a 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -90,8 +90,8 @@ class ReplicaFetcherThread(name:String, // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. - if (!LogConfig.fromProps(brokerConfig.toProps, AdminUtils.fetchTopicConfig(replicaMgr.zkClient, - topicAndPartition.topic)).uncleanLeaderElectionEnable) { + if (!LogConfig.fromProps(brokerConfig.toProps, AdminUtils.fetchEntityConfig(replicaMgr.zkClient, + ConfigType.Topics, topicAndPartition.topic)).uncleanLeaderElectionEnable) { // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) + " Current leader %d's latest offset %d is less than replica %d's latest offset %d" diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala index 527cb7c..6bde491 100644 --- a/core/src/main/scala/kafka/server/TopicConfigManager.scala +++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala @@ -103,7 +103,7 @@ class TopicConfigManager(private val zkClient: ZkClient, if (logsByTopic.contains(topic)) { /* combine the default properties with the overrides in zk to create the new LogConfig */ val props = new Properties(logManager.defaultConfig.toProps) - props.putAll(AdminUtils.fetchTopicConfig(zkClient, topic)) + props.putAll(AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topics, topic)) val logConfig = LogConfig.fromProps(props) for (log <- logsByTopic(topic)) log.config = logConfig diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index 7d14c6d..96f4cf8 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -22,7 +22,7 @@ import org.scalatest.junit.JUnit3Suite import kafka.utils.Logging import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness -import kafka.server.{OffsetManager, KafkaConfig} +import kafka.server.{ConfigType, OffsetManager, KafkaConfig} import kafka.admin.TopicCommand.TopicCommandOptions import kafka.utils.ZkUtils @@ -43,7 +43,7 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin "--config", cleanupKey + "=" + cleanupVal, "--topic", topic)) TopicCommand.createTopic(zkClient, createOpts) - val props = AdminUtils.fetchTopicConfig(zkClient, topic) + val props = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topics, topic) assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey)) assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal)) @@ -56,7 +56,7 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin "--config", cleanupKey + "=" + cleanupVal, "--topic", topic)) TopicCommand.alterTopic(zkClient, alterOpts) - val newProps = AdminUtils.fetchTopicConfig(zkClient, topic) + val newProps = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topics, topic) assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey)) assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal)) } -- 1.7.12.4 From 0c2ce529c116ac2d53dbb4e7d0ee11b2e49a6618 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Thu, 21 May 2015 10:37:55 -0700 Subject: [PATCH 3/5] KAFKA-2205 --- .../main/scala/kafka/server/ConfigHandler.scala | 2 - .../scala/kafka/server/DynamicConfigManager.scala | 44 +++++- .../scala/kafka/server/TopicConfigManager.scala | 152 --------------------- 3 files changed, 43 insertions(+), 155 deletions(-) delete mode 100644 core/src/main/scala/kafka/server/TopicConfigManager.scala diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index e452515..aa00487 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -4,8 +4,6 @@ package kafka.server import java.util.Properties import kafka.log.{LogConfig, LogManager} -import org.apache.kafka.common.metrics.Metrics - /** * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala index 93080a9..c6fc0ec 100644 --- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala +++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala @@ -17,17 +17,59 @@ package kafka.server +import kafka.utils.Json +import kafka.utils.Logging +import kafka.utils.SystemTime +import kafka.utils.Time +import kafka.utils.ZkUtils import scala.collection._ -import kafka.utils._ import kafka.admin.AdminUtils import org.I0Itec.zkclient.{IZkChildListener, ZkClient} + +/** + * Represents all the entities that can be configured via ZK + */ object ConfigType { val Topics = "topics" val Clients = "clients" } +/** + * This class initiates and carries out config changes for all entities defined in ConfigType. + * + * It works as follows. + * + * Config is stored under the path: /config/entityType/entityName + * E.g. /config/topics/ and /config/clients/ + * This znode stores the overrides for this entity (but no defaults) in properties format. + * + * To avoid watching all topics for changes instead we have a notification path + * /config/changes + * The DynamicConfigManager has a child watch on this path. + * + * To update a config we first update the config properties. Then we create a new sequential + * znode under the change path which contains the name of the entityType and entityName that was updated, say + * /config/changes/config_change_13321 + * The sequential znode contains data in this format: {"version" : 1, "entityType":"topic/client", "entityName" : "topic_name/client_id"} + * This is just a notification--the actual config change is stored only once under the /config/entityType/entityName path. + * + * This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications. + * It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds + * it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification. + * For any new changes it reads the new configuration, combines it with the defaults, and updates the existing config. + * + * Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is + * down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that + * if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the + * broker reads the config the both changes may have been made). In this case the broker would needlessly refresh the config twice, + * but that is harmless. + * + * On restart the config manager re-processes all notifications. This will usually be wasted work, but avoids any race conditions + * on startup where a change might be missed between the initial config load and registering for change notifications. + * + */ class DynamicConfigManager(private val zkClient: ZkClient, private val configHandler : Map[String, ConfigHandler], private val changeExpirationMs: Long = 15*60*1000, diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala deleted file mode 100644 index 6bde491..0000000 --- a/core/src/main/scala/kafka/server/TopicConfigManager.scala +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import java.util.Properties -import scala.collection._ -import kafka.log._ -import kafka.utils._ -import kafka.admin.AdminUtils -import org.I0Itec.zkclient.{IZkChildListener, ZkClient} - -/** - * This class initiates and carries out topic config changes. - * - * It works as follows. - * - * Config is stored under the path - * /config/topics/ - * This znode stores the topic-overrides for this topic (but no defaults) in properties format. - * - * To avoid watching all topics for changes instead we have a notification path - * /config/changes - * The TopicConfigManager has a child watch on this path. - * - * To update a topic config we first update the topic config properties. Then we create a new sequential - * znode under the change path which contains the name of the topic that was updated, say - * /config/changes/config_change_13321 - * This is just a notification--the actual config change is stored only once under the /config/topics/ path. - * - * This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications. - * It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds - * it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification. - * For any new changes it reads the new configuration, combines it with the defaults, and updates the log config - * for all logs for that topic (if any) that it has. - * - * Note that config is always read from the config path in zk, the notification is just a trigger to do so. So if a broker is - * down and misses a change that is fine--when it restarts it will be loading the full config anyway. Note also that - * if there are two consecutive config changes it is possible that only the last one will be applied (since by the time the - * broker reads the config the both changes may have been made). In this case the broker would needlessly refresh the config twice, - * but that is harmless. - * - * On restart the config manager re-processes all notifications. This will usually be wasted work, but avoids any race conditions - * on startup where a change might be missed between the initial config load and registering for change notifications. - * - */ -class TopicConfigManager(private val zkClient: ZkClient, - private val logManager: LogManager, - private val changeExpirationMs: Long = 15*60*1000, - private val time: Time = SystemTime) extends Logging { - private var lastExecutedChange = -1L - - /** - * Begin watching for config changes - */ - def startup() { - ZkUtils.makeSurePersistentPathExists(zkClient, null)//ZkUtils.TopicConfigChangesPath) - //zkClient.subscribeChildChanges(ZkUtils.TopicConfigChangesPath, ConfigChangeListener) - processAllConfigChanges() - } - - /** - * Process all config changes - */ - private def processAllConfigChanges() { - //val configChanges = zkClient.getChildren(ZkUtils.TopicConfigChangesPath) - val configChanges = zkClient.getChildren(null) - import JavaConversions._ - processConfigChanges((configChanges: mutable.Buffer[String]).sorted) - } - - /** - * Process the given list of config changes - */ - private def processConfigChanges(notifications: Seq[String]) { - if (notifications.size > 0) { - info("Processing config change notification(s)...") - val now = time.milliseconds - val logs = logManager.logsByTopicPartition.toBuffer - val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) - for (notification <- notifications) { - val changeId = changeNumber(notification) - if (changeId > lastExecutedChange) { - val changeZnode = null //ZkUtils.ATopicConfigChangesPath //+ "/" //+ notification - val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, changeZnode) - if(jsonOpt.isDefined) { - val json = jsonOpt.get - val topic = json.substring(1, json.length - 1) // hacky way to dequote - if (logsByTopic.contains(topic)) { - /* combine the default properties with the overrides in zk to create the new LogConfig */ - val props = new Properties(logManager.defaultConfig.toProps) - props.putAll(AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topics, topic)) - val logConfig = LogConfig.fromProps(props) - for (log <- logsByTopic(topic)) - log.config = logConfig - info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props)) - purgeObsoleteNotifications(now, notifications) - } - } - lastExecutedChange = changeId - } - } - } - } - - private def purgeObsoleteNotifications(now: Long, notifications: Seq[String]) { - for(notification <- notifications.sorted) { - val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(zkClient, null) //ZkUtils.TopicConfigChangesPath + "/" + notification) - if(jsonOpt.isDefined) { - val changeZnode = null //ZkUtils.TopicConfigChangesPath + "/" + notification - if (now - stat.getCtime > changeExpirationMs) { - debug("Purging config change notification " + notification) - ZkUtils.deletePath(zkClient, changeZnode) - } else { - return - } - } - } - } - - /* get the change number from a change notification znode */ - private def changeNumber(name: String): Long = 1 //name.substring(AdminUtils.TopicConfigChangeZnodePrefix.length).toLong - - /** - * A listener that applies config changes to logs - */ - object ConfigChangeListener extends IZkChildListener { - override def handleChildChange(path: String, chillins: java.util.List[String]) { - try { - import JavaConversions._ - processConfigChanges(chillins: mutable.Buffer[String]) - } catch { - case e: Exception => error("Error processing config change:", e) - } - } - } - -} \ No newline at end of file -- 1.7.12.4 From 8910e11ab0ee95ce2017a4ad0ad479434a3264ab Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Thu, 21 May 2015 10:49:06 -0700 Subject: [PATCH 4/5] KAFKA-2205 --- core/src/main/scala/kafka/server/ConfigHandler.scala | 4 ---- core/src/main/scala/kafka/server/KafkaServer.scala | 4 ---- 2 files changed, 8 deletions(-) diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index aa00487..1ae056e 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -19,15 +19,11 @@ trait ConfigHandler { class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler{ def processConfigChanges(topic : String, topicConfig : Properties) { - System.out.println("Received change " + topic + ", config " + topicConfig) - val logs = logManager.logsByTopicPartition.toBuffer val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) if (logsByTopic.contains(topic)) { - System.out.println("Chaning log config") - /* combine the default properties with the overrides in zk to create the new LogConfig */ val props = new Properties(logManager.defaultConfig.toProps) props.putAll(topicConfig) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index caa38b6..d1be884 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -190,10 +190,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg } } - private def initMetrics() : Metrics = { - new Metrics() - } - private def initZk(): ZkClient = { info("Connecting to zookeeper on " + config.zkConnect) -- 1.7.12.4 From 306754b3697cf41f48e065d9703f424ee8235c09 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Wed, 1 Jul 2015 18:37:19 -0700 Subject: [PATCH 5/5] Addressing Jun's comments --- core/src/main/scala/kafka/admin/AdminUtils.scala | 30 ++++++++++++---------- .../src/main/scala/kafka/admin/ConfigCommand.scala | 16 ++++++------ core/src/main/scala/kafka/admin/TopicCommand.scala | 16 +++--------- .../main/scala/kafka/server/ConfigHandler.scala | 22 +++++++++------- .../scala/kafka/server/DynamicConfigManager.scala | 7 ++--- .../scala/unit/kafka/admin/ConfigCommandTest.scala | 16 ++++++------ .../scala/unit/kafka/admin/TopicCommandTest.scala | 4 +-- .../kafka/server/DynamicConfigChangeTest.scala | 18 ++++++------- 8 files changed, 63 insertions(+), 66 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 4ddff75..c58b7e6 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -102,14 +102,12 @@ object AdminUtils extends Logging { * @param numPartitions Number of partitions to be set * @param replicaAssignmentStr Manual replica assignment * @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing - * @param config Pre-existing properties that should be preserved */ def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "", - checkBrokerAvailable: Boolean = true, - config: Properties = new Properties) { + checkBrokerAvailable: Boolean = true) { val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic)) if (existingPartitionsReplicaList.size == 0) throw new AdminOperationException("The topic %s does not exist".format(topic)) @@ -136,7 +134,7 @@ object AdminUtils extends Logging { val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2) // add the new list partitionReplicaList ++= newPartitionReplicaList - AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, config, true) + AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true) } def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = { @@ -231,7 +229,7 @@ object AdminUtils extends Logging { val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig) } - + def createOrUpdateTopicPartitionAssignmentPathInZK(zkClient: ZkClient, topic: String, partitionReplicaAssignment: Map[Int, Seq[Int]], @@ -239,21 +237,24 @@ object AdminUtils extends Logging { update: Boolean = false) { // validate arguments Topic.validate(topic) - LogConfig.validate(config) require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.") val topicPath = ZkUtils.getTopicPath(topic) - if(!update && zkClient.exists(topicPath)) + if (!update && zkClient.exists(topicPath)) throw new TopicExistsException("Topic \"%s\" already exists.".format(topic)) partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment)) - - // write out the config if there is any, this isn't transactional with the partition assignments - writeEntityConfig(zkClient, ConfigType.Topics, topic, config) - + + // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported + if (!update) { + // write out the config if there is any, this isn't transactional with the partition assignments + LogConfig.validate(config) + writeEntityConfig(zkClient, ConfigType.Topics, topic, config) + } + // create the partition assignment writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update) } - + private def writeTopicPartitionAssignment(zkClient: ZkClient, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) { try { val zkPath = ZkUtils.getTopicPath(topic) @@ -312,7 +313,7 @@ object AdminUtils extends Logging { } def getConfigChangeZnodeData(entityType: String, entityName: String) : Map[String, Any] = { - Map("version" -> 1, "entityType" -> entityType, "entityName" -> entityName) + Map("version" -> 1, "entity_type" -> entityType, "entity_name" -> entityName) } /** @@ -350,7 +351,8 @@ object AdminUtils extends Logging { case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str) } - case o => throw new IllegalArgumentException("Unexpected value in config: " + str) + case o => throw new IllegalArgumentException("Unexpected value in config:(%s), entity_type: (%s), entity: (%s)" + .format(str, entityType, entity)) } } props diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 76a445d..ccac1b8 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -39,11 +39,6 @@ object ConfigCommand { if(args.length == 0) CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity (topic/client) configs") - // should have exactly one action - val actions = Seq(opts.alterOpt, opts.describeOpt).count(opts.options.has _) - if(actions != 1) - CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --describe, --alter") - opts.checkArgs() val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) @@ -119,10 +114,10 @@ object ConfigCommand { .ofType(classOf[String]) val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.") val describeOpt = parser.accepts("describe", "List configs for the given entity.") - val entityType = parser.accepts("entityType", "Type of entity (topic/client)") + val entityType = parser.accepts("entity-type", "Type of entity (topic/client)") .withRequiredArg .ofType(classOf[String]) - val entityName = parser.accepts("entityName", "Name of entity (topic name/client id)") + val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id)") .withRequiredArg .ofType(classOf[String]) val addedConfig = parser.accepts("added-config", "Key Value pairs configs to add 'k1=v1,k2=v2'") @@ -139,10 +134,15 @@ object ConfigCommand { val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, entityName, addedConfig, deletedConfig, helpOpt) def checkArgs() { + // should have exactly one action + val actions = Seq(alterOpt, describeOpt).count(options.has _) + if(actions != 1) + CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --describe, --alter") + // check required args CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType, entityName) CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt)) - CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt)) + CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addedConfig, deletedConfig)) if(options.has(alterOpt)) { val isAddedPresent: Boolean = options.has(addedConfig) val isDeletedPresent: Boolean = options.has(deletedConfig) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 2a2271c..e776335 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -101,16 +101,6 @@ object TopicCommand { println("Topic %s does not exist".format(opts.options.valueOf(opts.topicOpt))) } topics.foreach { topic => - val configs = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topics, topic) - if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { - val configsToBeAdded = parseTopicConfigsToBeAdded(opts) - val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts) - // compile the final set of configs - configs.putAll(configsToBeAdded) - configsToBeDeleted.foreach(config => configs.remove(config)) - AdminUtils.changeTopicConfig(zkClient, topic, configs) - println("Updated config for topic \"%s\".".format(topic)) - } if(opts.options.has(opts.partitionsOpt)) { if (topic == OffsetManager.OffsetsTopicName) { throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.") @@ -119,7 +109,7 @@ object TopicCommand { "logic or ordering of the messages will be affected") val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt) - AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr, config = configs) + AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr) println("Adding partitions succeeded!") } } @@ -259,7 +249,7 @@ object TopicCommand { .describedAs("topic") .ofType(classOf[String]) val nl = System.getProperty("line.separator") - val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered." + + val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered." + "The following is a list of valid configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl + "See the Kafka documentation for full details on the topic configs.") .withRequiredArg @@ -306,6 +296,8 @@ object TopicCommand { CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt)) CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt)) CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, allTopicLevelOpts -- Set(createOpt,alterOpt)) + // Topic configs cannot be changed with alterTopic + CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(configOpt, deleteConfigOpt)) if(options.has(createOpt)) CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt)) CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt, diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 1ae056e..ee3c672 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -2,8 +2,13 @@ package kafka.server import java.util.Properties +import java.util.concurrent.ConcurrentHashMap -import kafka.log.{LogConfig, LogManager} +import kafka.common.TopicAndPartition +import kafka.log.{Log, LogConfig, LogManager} + +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConversions._ /** * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager @@ -19,11 +24,12 @@ trait ConfigHandler { class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler{ def processConfigChanges(topic : String, topicConfig : Properties) { - val logs = logManager.logsByTopicPartition.toBuffer - val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) + val logs: mutable.Buffer[(TopicAndPartition, Log)] = logManager.logsByTopicPartition.toBuffer + val logsByTopic: Map[String, mutable.Buffer[Log]] = logs.groupBy{ case k: (TopicAndPartition, Log) => k._1.topic } + .mapValues{ case v: mutable.Buffer[(TopicAndPartition, Log)] => v.map(_._2) } - if (logsByTopic.contains(topic)) - { + System.out.println(logsByTopic) + if (logsByTopic.contains(topic)) { /* combine the default properties with the overrides in zk to create the new LogConfig */ val props = new Properties(logManager.defaultConfig.toProps) props.putAll(topicConfig) @@ -40,11 +46,9 @@ class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandl * This implementation does nothing currently. In the future, it will change quotas per client */ class ClientIdConfigHandler extends ConfigHandler { - @volatile var clientId: String = null - @volatile var clientConfig: Properties = null + val configMap: concurrent.Map[String, Properties] = new ConcurrentHashMap[String, Properties]() def processConfigChanges(clientId : String, clientConfig : Properties): Unit = { - this.clientId = clientId - this.clientConfig = clientConfig + configMap.put(clientId, clientConfig) } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala index c6fc0ec..3295871 100644 --- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala +++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala @@ -120,19 +120,20 @@ class DynamicConfigManager(private val zkClient: ZkClient, if(jsonOpt.isDefined) { val json = jsonOpt.get Json.parseFull(json) match { - case None => // there are no config overrides + case None => // There are no config overrides. + // Ignore non-json notifications because they can be from the deprecated TopicConfigManager case Some(mapAnon: Map[_, _]) => val map = mapAnon collect { case (k: String, v: Any) => k -> v } require(map("version") == 1) - val entityType = map.get("entityType") match { + val entityType = map.get("entity_type") match { case Some(ConfigType.Topics) => ConfigType.Topics case Some(ConfigType.Clients) => ConfigType.Clients case _ => throw new IllegalArgumentException("Invalid entityType config. Must be either 'client' or 'topic'") } - val entity = map.get("entityName") match { + val entity = map.get("entity_name") match { case Some(value: String) => value case _ => throw new IllegalArgumentException("Value not specified in config change notification " + json) } diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index 11cb35b..90b6452 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -32,31 +32,31 @@ class ConfigCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggi def testArgumentParse() { // Should parse correctly var createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, - "--entityName", "x", - "--entityType", "clients", + "--entity-name", "x", + "--entity-type", "clients", "--describe")) createOpts.checkArgs() // For --alter and added config createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, - "--entityName", "x", - "--entityType", "clients", + "--entity-name", "x", + "--entity-type", "clients", "--alter", "--added-config", "a=b,c=d")) createOpts.checkArgs() // For alter and deleted config createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, - "--entityName", "x", - "--entityType", "clients", + "--entity-name", "x", + "--entity-type", "clients", "--alter", "--deleted-config", "a,b,c")) createOpts.checkArgs() // For alter and both added, deleted config createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, - "--entityName", "x", - "--entityType", "clients", + "--entity-name", "x", + "--entity-type", "clients", "--alter", "--added-config", "a=b,c=d", "--deleted-config", "a")) diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index 96f4cf8..391d71d 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -52,9 +52,7 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin // modify the topic to add new partitions val numPartitionsModified = 3 - val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString, - "--config", cleanupKey + "=" + cleanupVal, - "--topic", topic)) + val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString, "--topic", topic)) TopicCommand.alterTopic(zkClient, alterOpts) val newProps = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topics, topic) assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey)) diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index db2dc24..214d04c 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -20,7 +20,6 @@ package kafka.server import java.util.Properties import junit.framework.Assert._ -import org.I0Itec.zkclient.ZkClient import org.easymock.{Capture, EasyMock} import org.junit.Test import kafka.integration.KafkaServerTestHarness @@ -64,9 +63,10 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { AdminUtils.changeClientIdConfig(zkClient, clientId, props) TestUtils.retry(10000) { val configHandler = this.servers(0).dynamicConfigHandlers(ConfigType.Clients).asInstanceOf[ClientIdConfigHandler] - assertEquals("ClientId should be set to testClient", clientId, configHandler.clientId) - assertEquals("c", configHandler.clientConfig.getProperty("a.b")) - assertEquals("z", configHandler.clientConfig.getProperty("x.y")) + assertTrue("ClientId testClient must exist", configHandler.configMap.contains(clientId)) + assertEquals("ClientId testClient must be the only override", 1, configHandler.configMap.size) + assertEquals("c", configHandler.configMap(clientId).getProperty("a.b")) + assertEquals("z", configHandler.configMap(clientId).getProperty("x.y")) } } @@ -95,8 +95,8 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { EasyMock.expectLastCall() EasyMock.replay(handler) - val configManager = new DynamicConfigManager(zkClient, Map("Topic" -> handler)) - // Is ignored + val configManager = new DynamicConfigManager(zkClient, Map(ConfigType.Topics -> handler)) + // Notifications created using the old TopicConfigManager are ignored. configManager.processNotification(Some("not json")) // Incorrect Map. No version @@ -110,7 +110,7 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { } // Version is provided. EntityType is incorrect try { - val jsonMap = Map("version" -> 1, "entityType" -> "garbage", "entityName" -> "x") + val jsonMap = Map("version" -> 1, "entity_type" -> "garbage", "entity_name" -> "x") configManager.processNotification(Some(Json.encode(jsonMap))) fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) } @@ -120,7 +120,7 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { // EntityName isn't provided try { - val jsonMap = Map("version" -> 1, "entityType" -> ConfigType.Clients) + val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Clients) configManager.processNotification(Some(Json.encode(jsonMap))) fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap) } @@ -129,7 +129,7 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { } // Everything is provided - val jsonMap = Map("version" -> 1, "entityType" -> ConfigType.Clients, "entityName" -> "x") + val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Clients, "entity_name" -> "x") configManager.processNotification(Some(Json.encode(jsonMap))) } } \ No newline at end of file -- 1.7.12.4