From daa20a4894739cc509c992f63d05779978a2f4d4 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Tue, 12 May 2015 07:31:30 -0700 Subject: [PATCH] KAFKA-1936; Track offset commit requests separately from produce requests --- core/src/main/scala/kafka/log/Log.scala | 8 ++-- core/src/main/scala/kafka/server/KafkaApis.scala | 3 +- .../scala/kafka/server/KafkaRequestHandler.scala | 49 ++++++++++++++++++++ .../main/scala/kafka/server/ReplicaManager.scala | 19 +++----- .../scala/unit/kafka/server/OffsetCommitTest.scala | 38 ++++++++++++++- 5 files changed, 96 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 84e7b8f..0110836 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -21,7 +21,7 @@ import kafka.utils._ import kafka.message._ import kafka.common._ import kafka.metrics.KafkaMetricsGroup -import kafka.server.{LogOffsetMetadata, FetchDataInfo, BrokerTopicStats} +import kafka.server.{LogOffsetMetadata, FetchDataInfo, BrokerTopicStats, BrokerTopicMetrics} import java.io.{IOException, File} import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} @@ -321,8 +321,7 @@ class Log(val dir: File, if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) { // we record the original message set size instead of trimmed size // to be consistent with pre-compression bytesRejectedRate recording - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) + BrokerTopicStats.markBytesRejectedRate(topicAndPartition.topic, messages.sizeInBytes) throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize)) } @@ -395,8 +394,7 @@ class Log(val dir: File, // Check if the message sizes are valid. val messageSize = MessageSet.entrySize(m) if(messageSize > config.maxMessageSize) { - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) + BrokerTopicStats.markBytesRejectedRate(topicAndPartition.topic, messages.sizeInBytes) throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." .format(messageSize, config.maxMessageSize)) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 417960d..dc3f5f8 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -300,8 +300,7 @@ class KafkaApis(val requestChannel: RequestChannel, } // record the bytes out metrics only when the response is being sent - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes) + BrokerTopicStats.markBytesOutRate(topicAndPartition.topic, data.messages.sizeInBytes) } val response = FetchResponse(fetchRequest.correlationId, responsePartitionData) diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index a1558af..dfb1bf9 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -20,6 +20,7 @@ package kafka.server import kafka.network._ import kafka.utils._ import kafka.metrics.KafkaMetricsGroup +import kafka.common.Topic import java.util.concurrent.TimeUnit import com.yammer.metrics.core.Meter import org.apache.kafka.common.utils.Utils @@ -120,4 +121,52 @@ object BrokerTopicStats extends Logging { def getBrokerTopicStats(topic: String): BrokerTopicMetrics = { stats.getAndMaybePut(topic) } + + def markMessagesInRate(topic: String, n: Long): Unit = { + getBrokerTopicStats(topic).messagesInRate.mark(n) + if (!Topic.InternalTopics.contains(topic)) + allTopicsStats.messagesInRate.mark(n) + } + + def markBytesInRate(topic: String, n: Long): Unit = { + getBrokerTopicStats(topic).bytesInRate.mark(n) + if (!Topic.InternalTopics.contains(topic)) + allTopicsStats.bytesInRate.mark(n) + } + + def markBytesOutRate(topic: String, n: Long): Unit = { + getBrokerTopicStats(topic).bytesOutRate.mark(n) + if (!Topic.InternalTopics.contains(topic)) + allTopicsStats.bytesOutRate.mark(n) + } + + def markBytesRejectedRate(topic: String, n: Long): Unit = { + getBrokerTopicStats(topic).bytesRejectedRate.mark(n) + if (!Topic.InternalTopics.contains(topic)) + allTopicsStats.bytesRejectedRate.mark(n) + } + + def markFailedProduceRequestRate(topic: String, n: Long): Unit = { + getBrokerTopicStats(topic).failedProduceRequestRate.mark(n) + if (!Topic.InternalTopics.contains(topic)) + allTopicsStats.failedProduceRequestRate.mark(n) + } + + def markFailedFetchRequestRate(topic: String, n: Long): Unit = { + getBrokerTopicStats(topic).failedFetchRequestRate.mark(n) + if (!Topic.InternalTopics.contains(topic)) + allTopicsStats.failedFetchRequestRate.mark(n) + } + + def markTotalProduceRequestRate(topic: String, n: Long): Unit = { + getBrokerTopicStats(topic).totalProduceRequestRate.mark(n) + if (!Topic.InternalTopics.contains(topic)) + allTopicsStats.totalProduceRequestRate.mark(n) + } + + def markTotalFetchRequestRate(topic: String, n: Long): Unit = { + getBrokerTopicStats(topic).totalFetchRequestRate.mark(n) + if (!Topic.InternalTopics.contains(topic)) + allTopicsStats.totalFetchRequestRate.mark(n) + } } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 59c9bc3..9dfa913 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -348,12 +348,10 @@ class ReplicaManager(val config: KafkaConfig, requiredAcks: Short): Map[TopicAndPartition, LogAppendResult] = { trace("Append [%s] to local log ".format(messagesPerPartition)) messagesPerPartition.map { case (topicAndPartition, messages) => - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).totalProduceRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark() + BrokerTopicStats.markTotalProduceRequestRate(topicAndPartition.topic, 1) // reject appending to internal topics if it is not allowed if (Topic.InternalTopics.contains(topicAndPartition.topic) && !internalTopicsAllowed) { - (topicAndPartition, LogAppendResult( LogAppendInfo.UnknownLogAppendInfo, Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic))))) @@ -374,10 +372,8 @@ class ReplicaManager(val config: KafkaConfig, info.lastOffset - info.firstOffset + 1 // update stats for successfully appended bytes and messages as bytesInRate and messageInRate - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) - BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) + BrokerTopicStats.markBytesInRate(topicAndPartition.topic, messages.sizeInBytes) + BrokerTopicStats.markMessagesInRate(topicAndPartition.topic, numAppendedMessages) trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" .format(messages.sizeInBytes, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) @@ -400,8 +396,7 @@ class ReplicaManager(val config: KafkaConfig, case imse : InvalidMessageSizeException => (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse))) case t: Throwable => - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() + BrokerTopicStats.markFailedProduceRequestRate(topicAndPartition.topic, 1) error("Error processing append operation on partition %s".format(topicAndPartition), t) (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t))) } @@ -470,8 +465,7 @@ class ReplicaManager(val config: KafkaConfig, readPartitionInfo: Map[TopicAndPartition, PartitionFetchInfo]): Map[TopicAndPartition, LogReadResult] = { readPartitionInfo.map { case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => - BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark() + BrokerTopicStats.markTotalFetchRequestRate(topic, 1) val partitionDataAndOffsetInfo = try { @@ -519,8 +513,7 @@ class ReplicaManager(val config: KafkaConfig, case oor : OffsetOutOfRangeException => LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(oor)) case e: Throwable => - BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark() + BrokerTopicStats.markFailedFetchRequestRate(topic, 1) error("Error processing fetch operation on partition [%s,%d] offset %d".format(topic, partition, offset)) LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(e)) } diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 528525b..9208362 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -29,7 +29,7 @@ import org.scalatest.junit.JUnit3Suite import java.util.Properties import java.io.File - +import scala.collection.JavaConversions._ import scala.util.Random import scala.collection._ @@ -75,6 +75,42 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { super.tearDown() } + + @Test + def testOffsetMetrics() { + val topic = "test-topic" + val offsetTopic = OffsetManager.OffsetsTopicName + + // create topic + createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server)) + // send message + val sentMessages = sendMessages(Seq(server), topic, 2) + + val initialAllMessagesInRate = BrokerTopicStats.getBrokerAllTopicsStats().messagesInRate.count() + val initialAllBytesInRate = BrokerTopicStats.getBrokerAllTopicsStats().bytesInRate.count() + val initialAllTotalProduceRequestRate = BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.count() + + // commit offset + val simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64*1024, "test-client") + val commitRequest = OffsetCommitRequest("group9", immutable.Map(TopicAndPartition(topic, 0) -> OffsetAndMetadata(offset=42L))) + val commitResponse = simpleConsumer.commitOffsets(commitRequest) + + assertEquals("Counts should not increment after commit", initialAllMessagesInRate, + BrokerTopicStats.getBrokerAllTopicsStats().messagesInRate.count()) + + assertEquals("Counts should not increment after commit", initialAllBytesInRate, + BrokerTopicStats.getBrokerAllTopicsStats().bytesInRate.count()) + + assertEquals("Counts should not increment after commit", initialAllTotalProduceRequestRate, + BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.count()) + + assertTrue("fetch stats should be larger than zero", BrokerTopicStats.getBrokerTopicStats(offsetTopic).messagesInRate.count() > 0) + assertTrue("fetch stats should be larger than zero", BrokerTopicStats.getBrokerTopicStats(offsetTopic).bytesInRate.count() > 0) + assertTrue("fetch stats should be larger than zero", BrokerTopicStats.getBrokerTopicStats(offsetTopic).totalProduceRequestRate.count() > 0) + } + + + @Test def testUpdateOffsets() { val topic = "topic" -- 1.7.9.5