From 2540b6cb07f4e36898d99909957c96f830a79dad Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 10 Feb 2015 13:51:27 -0800 Subject: [PATCH] Change to not count MessageSetSizeTooLarge and MessageSizeTooLarge exceptions as failed producer requests from the brokers perspective --- core/src/main/scala/kafka/server/ReplicaManager.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index fb948b9..2ece002 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -340,11 +340,15 @@ class ReplicaManager(val config: KafkaConfig, (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe))) case nle: NotLeaderForPartitionException => (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle))) - case e: Throwable => + case mtl: MessageSizeTooLargeException => + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtl))) + case mstl: MessageSetSizeTooLargeException => + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstl))) + case t: Throwable => BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() - error("Error processing append operation on partition %s".format(topicAndPartition), e) - (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) + error("Error processing append operation on partition %s".format(topicAndPartition), t) + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t))) } } } -- 1.7.12.4