From 38b96eeb93698c5e7afb523761f2def5907a0954 Mon Sep 17 00:00:00 2001 From: Edward Ribeiro Date: Sat, 18 Jul 2015 00:34:29 -0300 Subject: [PATCH] KAFKA-2338 Warn users if they change max.message.bytes that they also need to update broker and consumer settings --- core/src/main/scala/kafka/admin/TopicCommand.scala | 14 +++++++++++++- .../main/scala/kafka/server/AbstractFetcherThread.scala | 3 +++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index a90aa87..7d3bbf6 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -26,6 +26,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException import scala.collection._ import scala.collection.JavaConversions._ import kafka.log.LogConfig +import kafka.log.Defaults import kafka.consumer.Whitelist import org.apache.kafka.common.utils.Utils import kafka.coordinator.ConsumerCoordinator @@ -42,7 +43,7 @@ object TopicCommand extends Logging { // should have exactly one action val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) - if(actions != 1) + if(actions != 1) CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete") opts.checkArgs() @@ -82,9 +83,19 @@ object TopicCommand extends Logging { allTopics } + private def checkMaxMessageSize(configs: Properties) { + val maxMessageSize = Integer.parseInt(configs.getProperty(LogConfig.MaxMessageBytesProp, "0")); + if (maxMessageSize > Defaults.MaxMessageSize) + println("WARNING: %s has been increased beyond the max value of %d, " + + "update broker and consumer settings as well".format(LogConfig.MaxMessageBytesProp, Defaults.MaxMessageSize)) + } + def createTopic(zkClient: ZkClient, opts: TopicCommandOptions) { val topic = opts.options.valueOf(opts.topicOpt) val configs = parseTopicConfigsToBeAdded(opts) + + checkMaxMessageSize(configs) + if (opts.options.has(opts.replicaAssignmentOpt)) { val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, assignment, configs) @@ -107,6 +118,7 @@ object TopicCommand extends Logging { val configs = AdminUtils.fetchTopicConfig(zkClient, topic) if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) { val configsToBeAdded = parseTopicConfigsToBeAdded(opts) + checkMaxMessageSize(configsToBeAdded) val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts) // compile the final set of configs configs.putAll(configsToBeAdded) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index f843061..c1b60ad 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -163,6 +163,9 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) partitionsWithError += topicAndPartition } + case ErrorMapping.MessageSizeTooLargeCode => + error("Error for partition [%s,%d] to broker %d: message size too large".format(topic, partitionId, sourceBroker.id)) + partitionsWithError += topicAndPartition case _ => if (isRunning.get) { error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, -- 2.4.6