diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index b2a7170..34c5376 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -265,9 +265,6 @@ private[kafka] class Log(val dir: File, if(messageSetInfo.count == 0) { (-1L, -1L) } else { - BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count) - BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(messageSetInfo.count) - // trim any invalid bytes or partial messages before appending it to the on-disk log var validMessages = trimInvalidBytes(messages) @@ -288,6 +285,9 @@ private[kafka] class Log(val dir: File, case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) } val lastOffset = offsetCounter.get - 1 + val numMessages = lastOffset - firstOffset + 1 + BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(numMessages) + BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numMessages) (firstOffset, lastOffset) } else { require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " + messages)