From 28879485fe94a96c7292cb948879db4909aba28c Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 3 Feb 2015 10:49:05 -0800 Subject: [PATCH] Fixing KAFKA-1914. Adding metrics to count total number of produce and fetch metrics --- core/src/main/scala/kafka/server/KafkaRequestHandler.scala | 2 ++ core/src/main/scala/kafka/server/ReplicaManager.scala | 6 ++++++ core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala | 8 +++++++- 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index e4053fb..4d86bdf 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -105,6 +105,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { val bytesRejectedRate = newMeter("BytesRejectedPerSec", "bytes", TimeUnit.SECONDS, tags) val failedProduceRequestRate = newMeter("FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS, tags) val failedFetchRequestRate = newMeter("FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS, tags) + val totalProduceRequestRate = newMeter("TotalProduceRequestsPerSec", "requests", TimeUnit.SECONDS, tags) + val totalFetchRequestRate = newMeter("TotalFetchRequestsPerSec", "requests", TimeUnit.SECONDS, tags) } object BrokerTopicStats extends Logging { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index fb948b9..66c7061 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -298,6 +298,9 @@ class ReplicaManager(val config: KafkaConfig, requiredAcks: Short): Map[TopicAndPartition, LogAppendResult] = { trace("Append [%s] to local log ".format(messagesPerPartition)) messagesPerPartition.map { case (topicAndPartition, messages) => + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).totalProduceRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark() + // reject appending to internal topics if it is not allowed if (Topic.InternalTopics.contains(topicAndPartition.topic) && !internalTopicsAllowed) { @@ -410,6 +413,9 @@ class ReplicaManager(val config: KafkaConfig, readPartitionInfo: Map[TopicAndPartition, PartitionFetchInfo]): Map[TopicAndPartition, LogReadResult] = { readPartitionInfo.map { case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => + BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.mark() + BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark() + val partitionDataAndOffsetInfo = try { trace("Fetching log segment for topic %s, partition %d, offset %d, size %d".format(topic, partition, offset, fetchSize)) diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala index ccf5e2e..f126718 100644 --- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala @@ -129,13 +129,19 @@ class SimpleFetchTest extends JUnit3Suite { * should only return data up to the HW of the partition; when a fetch operation with read * committed data turned off is received, the replica manager could return data up to the LEO * of the local leader replica's log. + * + * This test also verifies counts of fetch requests recorded by the ReplicaManager */ def testReadFromLog() { + val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count(); + val initialAllTopicsCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count(); assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW, replicaManager.readFromLocalLog(true, true, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message) - assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO, replicaManager.readFromLocalLog(true, false, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message) + + assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count()); + assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()); } } -- 1.7.12.4