diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 8107a64..8ff4bd5 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -200,14 +200,21 @@ object AdminUtils extends Logging { /** * Update the config for an existing topic and create a change notification so the change will propagate to other brokers + * @param zkClient: The ZkClient handle used to write the new config to zookeeper + * @param topic: The topic for which configs are being changed + * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or + * existing configs need to be deleted, it should be done prior to invoking this API + * */ - def changeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) { - LogConfig.validate(config) + def changeTopicConfig(zkClient: ZkClient, topic: String, configs: Properties) { if(!topicExists(zkClient, topic)) throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic)) - + + // remove the topic overrides + LogConfig.validate(configs) + // write the new config--may not exist if there were previously no overrides - writeTopicConfig(zkClient, topic, config) + writeTopicConfig(zkClient, topic, configs) // create the change notification zkClient.createPersistentSequential(ZkUtils.TopicConfigChangesPath + "/" + TopicConfigChangeZnodePrefix, Json.encode(topic)) @@ -217,14 +224,12 @@ object AdminUtils extends Logging { * Write out the topic config to zk, if there is any */ private def writeTopicConfig(zkClient: ZkClient, topic: String, config: Properties) { - if(config.size > 0) { - val configMap: mutable.Map[String, String] = { - import JavaConversions._ - config - } - val map = Map("version" -> 1, "config" -> configMap) - ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map)) + val configMap: mutable.Map[String, String] = { + import JavaConversions._ + config } + val map = Map("version" -> 1, "config" -> configMap) + ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicConfigPath(topic), Json.encode(map)) } /** diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 56f3177..3c08dee 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -23,8 +23,8 @@ import kafka.utils._ import org.I0Itec.zkclient.ZkClient import scala.collection._ import scala.collection.JavaConversions._ -import kafka.common.Topic import kafka.cluster.Broker +import kafka.log.LogConfig object TopicCommand { @@ -61,7 +61,7 @@ object TopicCommand { def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) val topics = opts.options.valuesOf(opts.topicOpt) - val configs = parseTopicConfigs(opts) + val configs = parseTopicConfigsToBeAdded(opts) for (topic <- topics) { if (opts.options.has(opts.replicaAssignmentOpt)) { val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) @@ -79,8 +79,13 @@ object TopicCommand { def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt) val topic = opts.options.valueOf(opts.topicOpt) - if(opts.options.has(opts.configOpt)) { - val configs = parseTopicConfigs(opts) + if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { + val configsToBeAdded = parseTopicConfigsToBeAdded(opts) + val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts) + // compile the final set of configs + val configs = AdminUtils.fetchTopicConfig(zkClient, topic) + configs.putAll(configsToBeAdded) + configsToBeDeleted.foreach(config => configs.remove(config)) AdminUtils.changeTopicConfig(zkClient, topic, configs) println("Updated config for topic \"%s\".".format(topic)) } @@ -147,14 +152,28 @@ object TopicCommand { def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port + ")" - def parseTopicConfigs(opts: TopicCommandOptions): Properties = { - val configs = opts.options.valuesOf(opts.configOpt).map(_.split("\\s*=\\s*")) - require(configs.forall(_.length == 2), "Invalid topic config: all configs must be in the format \"key=val\".") + def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = { + val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*""")) + require(configsToBeAdded.forall(config => config.length == 2), + "Invalid topic config: all configs to be added must be in the format \"key=val\".") val props = new Properties - configs.foreach(pair => props.setProperty(pair(0), pair(1))) + configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim)) + LogConfig.validate(props) props } - + + def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = { + val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.split("""\s*=\s*""")) + if(opts.options.has(opts.createOpt)) + require(configsToBeDeleted.size == 0, "Invalid topic config: all configs on create topic must be in the format \"key=val\".") + require(configsToBeDeleted.forall(config => config.length == 1), + "Invalid topic config: all configs to be deleted must be in the format \"key\".") + val propsToBeDeleted = new Properties + configsToBeDeleted.foreach(pair => propsToBeDeleted.setProperty(pair(0).trim, "")) + LogConfig.validateNames(propsToBeDeleted) + configsToBeDeleted.map(pair => pair(0)) + } + def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = { val partitionList = replicaAssignmentList.split(",") val ret = new mutable.HashMap[Int, List[Int]]() @@ -184,10 +203,14 @@ object TopicCommand { .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val configOpt = parser.accepts("config", "A topic configuration for the topic being created or altered.") + val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered.") .withRequiredArg .describedAs("name=value") .ofType(classOf[String]) + val deleteConfigOpt = parser.accepts("deleteConfig", "A topic configuration override to be removed for an existing topic") + .withRequiredArg + .describedAs("name") + .ofType(classOf[String]) val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " + "altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected") .withRequiredArg diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 51ec796..0b32aee 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -132,7 +132,7 @@ object LogConfig { /** * Check that property names are valid */ - private def validateNames(props: Properties) { + def validateNames(props: Properties) { import JavaConversions._ for(name <- props.keys) require(LogConfig.ConfigNames.contains(name), "Unknown configuration \"%s\".".format(name)) diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala index 56cae58..42e98dd 100644 --- a/core/src/main/scala/kafka/server/TopicConfigManager.scala +++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala @@ -90,7 +90,6 @@ class TopicConfigManager(private val zkClient: ZkClient, val now = time.milliseconds val logs = logManager.logsByTopicPartition.toBuffer val logsByTopic = logs.groupBy(_._1.topic).mapValues(_.map(_._2)) - val lastChangeId = notifications.map(changeNumber).max for (notification <- notifications) { val changeId = changeNumber(notification) if (changeId > lastExecutedChange) {