From 743deff7bb94211cd570e519266e3b8a0e122d8f Mon Sep 17 00:00:00 2001 From: Edward Ribeiro Date: Sat, 18 Jul 2015 00:34:29 -0300 Subject: [PATCH] KAFKA-2338 Warn users if they change max.message.bytes that they also need to update broker and consumer settings --- core/src/main/scala/kafka/admin/TopicCommand.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 4e28bf1..d21a726 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -26,6 +26,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException import scala.collection._ import scala.collection.JavaConversions._ import kafka.log.LogConfig +import kafka.log.Defaults import kafka.consumer.Whitelist import org.apache.kafka.common.utils.Utils import kafka.coordinator.ConsumerCoordinator @@ -42,7 +43,7 @@ object TopicCommand extends Logging { // should have exactly one action val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) - if(actions != 1) + if(actions != 1) CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete") opts.checkArgs() @@ -82,11 +83,18 @@ object TopicCommand extends Logging { allTopics } + private def checkMaxMessageSize(configs: Properties) { + val maxMessageSize = Integer.parseInt(configs.getProperty(LogConfig.MaxMessageBytesProp, "0")); + if (maxMessageSize > Defaults.MaxMessageSize) + println("WARNING: %s has been increased beyond the max value of %d, update broker and consumer settings as well".format(LogConfig.MaxMessageBytesProp, Defaults.MaxMessageSize)) + } + def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) { val topic = opts.options.valueOf(opts.topicOpt) val configs = parseTopicConfigsToBeAdded(opts) if (Topic.hasCollisionChars(topic)) println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.") + checkMaxMessageSize(configs) if (opts.options.has(opts.replicaAssignmentOpt)) { val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs, update = false) @@ -109,6 +117,7 @@ object TopicCommand extends Logging { val configs = AdminUtils.fetchTopicConfig(zkClient, topic) if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { val configsToBeAdded = parseTopicConfigsToBeAdded(opts) + checkMaxMessageSize(configsToBeAdded) val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts) // compile the final set of configs configs.putAll(configsToBeAdded) -- 2.4.6