diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 8107a64..9ccd32a 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -201,13 +201,30 @@ object AdminUtils extends Logging { /** * Update the config for an existing topic and create a change notification so the change will propagate to other brokers */ - def changeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) { - LogConfig.validate(config) + def changeTopicConfig(zkClient: ZkClient, topic: String, configs: Properties) { if(!topicExists(zkClient, topic)) throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic)) - + + // figure out if config overrides need to be added or removed + val allConfigs = fetchTopicConfig(zkClient, topic) + allConfigs.putAll(configs) + // remove the topic overrides + val configKeys = allConfigs.keySet() + val configIterator = configKeys.iterator() + val configsToBeRemoved = new Properties() + while(configIterator.hasNext) { + val key = configIterator.next().asInstanceOf[String] + val value = allConfigs.getProperty(key) + if(value.isEmpty) { + configIterator.remove() + configsToBeRemoved.put(key, value) + } + } + LogConfig.validate(allConfigs) + LogConfig.validateNames(configsToBeRemoved) + // write the new config--may not exist if there were previously no overrides - writeTopicConfig(zkClient, topic, config) + writeTopicConfig(zkClient, topic, allConfigs) // create the change notification zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic)) @@ -217,14 +234,12 @@ object AdminUtils extends Logging { * Write out the topic config to zk, if there is any */ private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) { - if(config.size > 0) { - val configMap: mutable.Map[String, String] = { - import JavaConversions._ - config - } - val map = Map("version" -> 1, "config" -> configMap) - ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map)) + val configMap: mutable.Map[String, String] = { + import JavaConversions._ + config } + val map = Map("version" -> 1, "config" -> configMap) + ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map)) } /** diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 56f3177..e9b45f9 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -81,7 +81,7 @@ object TopicCommand { val topic = opts.options.valueOf(opts.topicOpt) if(opts.options.has(opts.configOpt)) { val configs = parseTopicConfigs(opts) - AdminUtils.changeTopicConfig(zkClient, topic, configs) + AdminUtils.changeTopicConfig(zkClient, topic = topic, configs) println("Updated config for topic \"%s\".".format(topic)) } if(opts.options.has(opts.partitionsOpt)) { @@ -149,12 +149,18 @@ object TopicCommand { def parseTopicConfigs(opts: TopicCommandOptions): Properties = { val configs = opts.options.valuesOf(opts.configOpt).map(_.split("\\s*=\\s*")) - require(configs.forall(_.length == 2), "Invalid topic config: all configs must be in the format \"key=val\".") + val configsToBeAdded = configs.filter(config => config.length == 2) + val configsToBeDeleted = configs.filter(config => config.length == 1) + if(opts.options.has(opts.createOpt)) + require(configsToBeDeleted.size == 0, "Invalid topic config: all configs on create topic must be in the format \"key=val\".") + require(configs.forall(config => (config.length == 1 || config.length == 2)), + "Invalid topic config: all configs must be in the format \"key=val\" or \"key=\".") val props = new Properties - configs.foreach(pair => props.setProperty(pair(0), pair(1))) + configsToBeAdded.foreach(pair => props.setProperty(pair(0), pair(1))) + configsToBeDeleted.foreach(pair => props.setProperty(pair(0), "")) props } - + def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = { val partitionList = replicaAssignmentList.split(",") val ret = new mutable.HashMap[Int, List[Int]]() diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 51ec796..0b32aee 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -132,7 +132,7 @@ object LogConfig { /** * Check that property names are valid */ - private def validateNames(props: Properties) { + def validateNames(props: Properties) { import JavaConversions._ for(name <- props.keys) require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name)) diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala index 56cae58..42e98dd 100644 --- a/core/src/main/scala/kafka/server/TopicConfigManager.scala +++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala @@ -90,7 +90,6 @@ class TopicConfigManager(private val zkClient: ZkClient, val now = time.milliseconds val logs = logManager.logsByTopicPartition.toBuffer val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) - val lastChangeId = notifications.map(changeNumber).max for (notification <- notifications) { val changeId = changeNumber(notification) if (changeId > lastExecutedChange) {