From d6fc88ca597860487608bb4d4f065c5f6455333a Mon Sep 17 00:00:00 2001 From: Edward Ribeiro Date: Fri, 17 Jul 2015 16:30:17 -0300 Subject: [PATCH] KAFKA-2338 Warn users if they change max.message.bytes that they also need to update broker and consumer settings --- core/src/main/scala/kafka/admin/TopicCommand.scala | 27 ++++++++++++++++------ .../scala/kafka/server/AbstractFetcherThread.scala | 3 +++ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index f1405a5..b7b94f8 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -27,6 +27,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException import scala.collection._ import scala.collection.JavaConversions._ import kafka.log.LogConfig +import kafka.log.Defaults import kafka.consumer.Whitelist import kafka.server.{ConfigType, OffsetManager} import org.apache.kafka.common.utils.Utils @@ -39,12 +40,12 @@ object TopicCommand extends Logging { val opts = new TopicCommandOptions(args) - if(args.length == 0) + if (args.length == 0) CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or change a topic.") // should have exactly one action val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) - if(actions != 1) + if (actions != 1) CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete") opts.checkArgs() @@ -54,13 +55,13 @@ object TopicCommand extends Logging { try { if(opts.options.has(opts.createOpt)) createTopic(zkClient, opts) - else if(opts.options.has(opts.alterOpt)) + else if (opts.options.has(opts.alterOpt)) alterTopic(zkClient, opts) - else if(opts.options.has(opts.listOpt)) + else if (opts.options.has(opts.listOpt)) listTopics(zkClient, opts) - else if(opts.options.has(opts.describeOpt)) + else if (opts.options.has(opts.describeOpt)) describeTopic(zkClient, opts) - else if(opts.options.has(opts.deleteOpt)) + else if (opts.options.has(opts.deleteOpt)) deleteTopic(zkClient, opts) } catch { case e: Throwable => @@ -84,11 +85,19 @@ object TopicCommand extends Logging { allTopics } + private def checkMaxMessageSize(configs: Properties) { + val maxMessageSize = Integer.parseInt(configs.getProperty(LogConfig.MaxMessageBytesProp, "0")); + if (maxMessageSize > Defaults.MaxMessageSize) + println("WARNING: %s has been increased beyond the max value of %d, " + + "update broker and consumer settings as well".format(LogConfig.MaxMessageBytesProp, Defaults.MaxMessageSize)) + } + def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) { val topic = opts.options.valueOf(opts.topicOpt) val configs = parseTopicConfigsToBeAdded(opts) if (Topic.hasCollisionChars(topic)) println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.") + checkMaxMessageSize(configs) if (opts.options.has(opts.replicaAssignmentOpt)) { val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs, update = false) @@ -107,6 +116,10 @@ object TopicCommand extends Logging { throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt), opts.options.valueOf(opts.zkConnectOpt))) } + if (opts.options.has(opts.configOpt)) { + val configsToBeAdded = parseTopicConfigsToBeAdded(opts) + checkMaxMessageSize(configsToBeAdded) + } topics.foreach { topic => if(opts.options.has(opts.partitionsOpt)) { if (topic == ConsumerCoordinator.OffsetsTopicName) { @@ -124,7 +137,7 @@ object TopicCommand extends Logging { def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) { val topics = getTopics(zkClient, opts) - for(topic <- topics) { + for (topic <- topics) { if (ZkUtils.pathExists(zkClient,ZkUtils.getDeleteTopicPath(topic))) { println("%s - marked for deletion".format(topic)) } else { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index dca975c..b7247c8 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -173,6 +173,9 @@ abstract class AbstractFetcherThread(name: String, error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) partitionsWithError += topicAndPartition } + case ErrorMapping.MessageSizeTooLargeCode => + error("Error for partition [%s,%d] to broker %d: message size too large".format(topic, partitionId, sourceBroker.id)) + partitionsWithError += topicAndPartition case _ => if (isRunning.get) { error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, -- 2.1.4