From 2c552c205b3eb8454098c32a933d4b81fb37d950 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Wed, 7 Jan 2015 12:34:13 -0800 Subject: [PATCH] KAFKA-1844. Remove max.message.bytes from topic config. --- core/src/main/scala/kafka/admin/TopicCommand.scala | 4 ++-- core/src/main/scala/kafka/log/LogConfig.scala | 22 ++++++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 285c033..42b355e 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -205,7 +205,7 @@ object TopicCommand { "Invalid topic config: all configs to be added must be in the format \"key=val\".") val props = new Properties configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim)) - LogConfig.validate(props) + LogConfig.validateTopicConfig(props) props } @@ -214,7 +214,7 @@ object TopicCommand { val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.trim()) val propsToBeDeleted = new Properties configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, "")) - LogConfig.validateNames(propsToBeDeleted) + LogConfig.validateTopicNames(propsToBeDeleted) configsToBeDeleted } else diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index ca7a99e..5dea15f 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -146,6 +146,7 @@ object LogConfig { val MinInSyncReplicasDoc = "If number of insync replicas drops below this number, we stop accepting writes with" + " -1 (or all) required acks" + private val invalidTopicConfig = Set(MaxMessageBytesProp) private val configDef = { import ConfigDef.Range._ import ConfigDef.ValidString._ @@ -225,6 +226,18 @@ object LogConfig { } /** + * Check that property names are valid for topic config + */ + def validateTopicNames(props: Properties) { + import JavaConversions._ + val names = configDef.names() + for(name <- props.keys) { + require(names.contains(name), "Unknown configuration \"%s\".".format(name)) + require(!invalidTopicConfig.contains(name), "Invalid configuraiton \"%s\".".format(name)) + } + } + + /** * Check that the given properties contain only valid log config names and that all values can be parsed and are valid */ def validate(props: Properties) { @@ -232,4 +245,13 @@ object LogConfig { configDef.parse(props) } + /** + * Check that the given properties contain only valid topic config names and that all values can be parsed and + * are valid for topic config. + */ + def validateTopicConfig(props: Properties) { + validateTopicNames(props) + configDef.parse(props) + } + } -- 1.9.3 (Apple Git-50)