From 2540b6cb07f4e36898d99909957c96f830a79dad Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 10 Feb 2015 13:51:27 -0800 Subject: [PATCH 1/3] Change to not count MessageSetSizeTooLarge and MessageSizeTooLarge exceptions as failed producer requests from the brokers perspective --- core/src/main/scala/kafka/server/ReplicaManager.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index fb948b9..2ece002 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -340,11 +340,15 @@ class ReplicaManager(val config: KafkaConfig, (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe))) case nle: NotLeaderForPartitionException => (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle))) - case e: Throwable => + case mtl: MessageSizeTooLargeException => + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtl))) + case mstl: MessageSetSizeTooLargeException => + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstl))) + case t: Throwable => BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() - error("Error processing append operation on partition %s".format(topicAndPartition), e) - (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e))) + error("Error processing append operation on partition %s".format(topicAndPartition), t) + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t))) } } } -- 1.7.12.4 From fd0d9e91567e7b998375a17cdd37ac5871a537af Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Wed, 18 Feb 2015 15:48:35 -0800 Subject: [PATCH 2/3] Fixing failing unit test --- core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index 525c835..c022bba 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -140,5 +140,9 @@ class SimpleFetchTest extends JUnit3Suite { replicaManager.readFromLocalLog(true, true, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message) assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO, replicaManager.readFromLocalLog(true, false, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message) + + assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count()); + assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()); + } } -- 1.7.12.4 From 67dc2e657de1f831b2b2f1a6910542d6b59244f8 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Wed, 18 Feb 2015 15:49:25 -0800 Subject: [PATCH 3/3] Fixing failing unit test --- core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index c022bba..fd8f32c 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -143,6 +143,5 @@ class SimpleFetchTest extends JUnit3Suite { assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count()); assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()); - } } -- 1.7.12.4