diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 8107a64..9ccd32a 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -201,13 +201,30 @@ object AdminUtils extends Logging { /** * Update the config for an existing topic and create a change notification so the change will propagate to other brokers */ - def changeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) { - LogConfig.validate(config) + def changeTopicConfig(zkClient: ZkClient, topic: String, configs: Properties) { if(!topicExists(zkClient, topic)) throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic)) - + + // figure out if config overrides need to be added or removed + val allConfigs = fetchTopicConfig(zkClient, topic) + allConfigs.putAll(configs) + // remove the topic overrides + val configKeys = allConfigs.keySet() + val configIterator = configKeys.iterator() + val configsToBeRemoved = new Properties() + while(configIterator.hasNext) { + val key = configIterator.next().asInstanceOf[String] + val value = allConfigs.getProperty(key) + if(value.isEmpty) { + configIterator.remove() + configsToBeRemoved.put(key, value) + } + } + LogConfig.validate(allConfigs) + LogConfig.validateNames(configsToBeRemoved) + // write the new config--may not exist if there were previously no overrides - writeTopicConfig(zkClient, topic, config) + writeTopicConfig(zkClient, topic, allConfigs) // create the change notification zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic)) @@ -217,14 +234,12 @@ object AdminUtils extends Logging { * Write out the topic config to zk, if there is any */ private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) { - if(config.size > 0) { - val configMap: mutable.Map[String, String] = { - import JavaConversions._ - config - } - val map = Map("version" -> 1, "config" -> configMap) - ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map)) + val configMap: mutable.Map[String, String] = { + import JavaConversions._ + config } + val map = Map("version" -> 1, "config" -> configMap) + ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map)) } /** diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 56f3177..51a9126 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -79,7 +79,7 @@ object TopicCommand { def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) val topic = opts.options.valueOf(opts.topicOpt) - if(opts.options.has(opts.configOpt)) { + if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { val configs = parseTopicConfigs(opts) AdminUtils.changeTopicConfig(zkClient, topic, configs) println("Updated config for topic \"%s\".".format(topic)) @@ -148,13 +148,20 @@ object TopicCommand { def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")" def parseTopicConfigs(opts: TopicCommandOptions): Properties = { - val configs = opts.options.valuesOf(opts.configOpt).map(_.split("\\s*=\\s*")) - require(configs.forall(_.length == 2), "Invalid topic config: all configs must be in the format \"key=val\".") + val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("\\s*=\\s*")) + val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.split("\\s*=\\s*")) + if(opts.options.has(opts.createOpt)) + require(configsToBeDeleted.size == 0, "Invalid topic config: all configs on create topic must be in the format \"key=val\".") + require(configsToBeAdded.forall(config => config.length == 2), + "Invalid topic config: all configs to be added must be in the format \"key=val\".") + require(configsToBeDeleted.forall(config => config.length == 1), + "Invalid topic config: all configs to be deleted must be in the format \"key\".") val props = new Properties - configs.foreach(pair => props.setProperty(pair(0), pair(1))) + configsToBeAdded.foreach(pair => props.setProperty(pair(0), pair(1))) + configsToBeDeleted.foreach(pair => props.setProperty(pair(0), "")) props } - + def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = { val partitionList = replicaAssignmentList.split(",") val ret = new mutable.HashMap[Int, List[Int]]() @@ -184,10 +191,14 @@ object TopicCommand { .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val configOpt = parser.accepts("config", "A topic configuration for the topic being created or altered.") + val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered.") .withRequiredArg .describedAs("name=value") .ofType(classOf[String]) + val deleteConfigOpt = parser.accepts("deleteConfig", "A topic configuration override to be removed for an existing topic") + .withRequiredArg + .describedAs("name") + .ofType(classOf[String]) val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " + "altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected") .withRequiredArg diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 51ec796..0b32aee 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -132,7 +132,7 @@ object LogConfig { /** * Check that property names are valid */ - private def validateNames(props: Properties) { + def validateNames(props: Properties) { import JavaConversions._ for(name <- props.keys) require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name)) diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala index 56cae58..42e98dd 100644 --- a/core/src/main/scala/kafka/server/TopicConfigManager.scala +++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala @@ -90,7 +90,6 @@ class TopicConfigManager(private val zkClient: ZkClient, val now = time.milliseconds val logs = logManager.logsByTopicPartition.toBuffer val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) - val lastChangeId = notifications.map(changeNumber).max for (notification <- notifications) { val changeId = changeNumber(notification) if (changeId > lastExecutedChange) {