From 80b8df7c85e4c18de2b16c3415bc05111e9c8518 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Sat, 25 Apr 2015 16:15:35 -0700 Subject: [PATCH] KAFKA-1936; Track offset commit requests separately from produce requests --- core/src/main/scala/kafka/log/Log.scala | 10 ++++-- .../main/scala/kafka/server/ReplicaManager.scala | 23 ++++++++---- .../scala/unit/kafka/server/OffsetCommitTest.scala | 38 +++++++++++++++++++- 3 files changed, 60 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 5563f2d..314aac1 100755 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -21,7 +21,7 @@ import kafka.utils._ import kafka.message._ import kafka.common._ import kafka.metrics.KafkaMetricsGroup -import kafka.server.{LogOffsetMetadata, FetchDataInfo, BrokerTopicStats} +import kafka.server.{OffsetManager, LogOffsetMetadata, FetchDataInfo, BrokerTopicStats} import java.io.{IOException, File} import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} @@ -305,7 +305,9 @@ class Log(val dir: File, // we record the original message set size instead of trimmed size // to be consistent with pre-compression bytesRejectedRate recording BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) + if (topicAndPartition.topic != OffsetManager.OffsetsTopicName) { + BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) + } throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize)) } @@ -379,7 +381,9 @@ class Log(val dir: File, val messageSize = MessageSet.entrySize(m) if(messageSize > config.maxMessageSize) { BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) + if (topicAndPartition.topic != OffsetManager.OffsetsTopicName) { + BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) + } throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." .format(messageSize, config.maxMessageSize)) } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8ddd325..a824ce4 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -349,11 +349,12 @@ class ReplicaManager(val config: KafkaConfig, trace("Append [%s] to local log ".format(messagesPerPartition)) messagesPerPartition.map { case (topicAndPartition, messages) => BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).totalProduceRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark() + if (topicAndPartition.topic != OffsetManager.OffsetsTopicName) { + BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark() + } // reject appending to internal topics if it is not allowed if (Topic.InternalTopics.contains(topicAndPartition.topic) && !internalTopicsAllowed) { - (topicAndPartition, LogAppendResult( LogAppendInfo.UnknownLogAppendInfo, Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicAndPartition.topic))))) @@ -375,9 +376,11 @@ class ReplicaManager(val config: KafkaConfig, // update stats for successfully appended bytes and messages as bytesInRate and messageInRate BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesInRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).messagesInRate.mark(numAppendedMessages) - BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) + if (topicAndPartition.topic != OffsetManager.OffsetsTopicName) { + BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages) + } trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" .format(messages.sizeInBytes, topicAndPartition.topic, topicAndPartition.partition, info.firstOffset, info.lastOffset)) @@ -401,7 +404,9 @@ class ReplicaManager(val config: KafkaConfig, (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse))) case t: Throwable => BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() + if (topicAndPartition.topic != OffsetManager.OffsetsTopicName) { + BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() + } error("Error processing append operation on partition %s".format(topicAndPartition), t) (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t))) } @@ -471,7 +476,9 @@ class ReplicaManager(val config: KafkaConfig, readPartitionInfo.map { case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) => BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark() + if (topic != OffsetManager.OffsetsTopicName) { + BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark() + } val partitionDataAndOffsetInfo = try { @@ -520,7 +527,9 @@ class ReplicaManager(val config: KafkaConfig, LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(oor)) case e: Throwable => BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark() - BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark() + if (topic != OffsetManager.OffsetsTopicName) { + BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark() + } error("Error processing fetch operation on partition [%s,%d] offset %d".format(topic, partition, offset)) LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, false, Some(e)) } diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 652208a..db2d07f 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -29,7 +29,7 @@ import org.scalatest.junit.JUnit3Suite import java.util.Properties import java.io.File - +import scala.collection.JavaConversions._ import scala.util.Random import scala.collection._ @@ -75,6 +75,42 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { super.tearDown() } + + @Test + def testOffsetMetrics() { + val topic = "test-topic" + val offsetTopic = OffsetManager.OffsetsTopicName + + // create topic + createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server)) + // send message + val sentMessages = sendMessages(Seq(server), topic, 2) + + val initialAllMessagesInRate = BrokerTopicStats.getBrokerAllTopicsStats().messagesInRate.count(); + val initialAllBytesInRate = BrokerTopicStats.getBrokerAllTopicsStats().bytesInRate.count(); + val initialAllTotalProduceRequestRate = BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.count(); + + // commit offset + val simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64*1024, "test-client") + val commitRequest = OffsetCommitRequest("group9", immutable.Map(TopicAndPartition(topic, 0) -> OffsetAndMetadata(offset=42L))) + val commitResponse = simpleConsumer.commitOffsets(commitRequest) + + assertEquals("Counts should not increment after commit", initialAllMessagesInRate, + BrokerTopicStats.getBrokerAllTopicsStats().messagesInRate.count()) + + assertEquals("Counts should not increment after commit", initialAllBytesInRate, + BrokerTopicStats.getBrokerAllTopicsStats().bytesInRate.count()) + + assertEquals("Counts should not increment after commit", initialAllTotalProduceRequestRate, + BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.count()) + + assertTrue("fetch stats should be larget than zero", BrokerTopicStats.getBrokerTopicStats(offsetTopic).messagesInRate.count() > 0) + assertTrue("fetch stats should be larget than zero", BrokerTopicStats.getBrokerTopicStats(offsetTopic).bytesInRate.count() > 0) + assertTrue("fetch stats should be larget than zero", BrokerTopicStats.getBrokerTopicStats(offsetTopic).totalProduceRequestRate.count() > 0) + } + + + @Test def testUpdateOffsets() { val topic = "topic" -- 1.7.9.5