From 913f282dda427fa24e01b4b4828d6d644f5f4dbe Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Thu, 23 Jul 2015 00:05:49 -0700 Subject: [PATCH] KAFKA-2332; Add quota metrics to old producer and consumer --- .../consumer/FetchRequestAndResponseStats.scala | 2 ++ .../main/scala/kafka/consumer/SimpleConsumer.scala | 2 ++ .../kafka/producer/ProducerRequestStats.scala | 1 + .../main/scala/kafka/producer/SyncProducer.scala | 8 ++++++-- 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala index 3df55e1..7ff5fe9 100644 --- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala +++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker} import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.utils.Pool +import org.apache.kafka.common.metrics.stats.Histogram class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup { val tags = metricId match { @@ -34,6 +35,7 @@ class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetr val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags)) val requestSizeHist = newHistogram("FetchResponseSize", biased = true, tags) + val throttleTimeHist = newHistogram("FetchRequestThrottleTimeMs", biased = true, tags) } /** diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 7ebc040..b031da1 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -135,6 +135,8 @@ class SimpleConsumer(val host: String, val fetchedSize = fetchResponse.sizeInBytes fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize) fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize) + fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).throttleTimeHist.update(fetchResponse.throttleTimeMs) + fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.throttleTimeHist.update(fetchResponse.throttleTimeMs) fetchResponse } diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala index 026e93a..84331ac 100644 --- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala @@ -29,6 +29,7 @@ class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup val requestTimer = new KafkaTimer(newTimer("ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags)) val requestSizeHist = newHistogram("ProducerRequestSize", biased = true, tags) + val throttleTimeHist = newHistogram("ProducerRequestThrottleTimeMs", biased = true, tags) } /** diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index dcee501..7245390 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -104,8 +104,12 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { response = doSend(producerRequest, if(producerRequest.requiredAcks == 0) false else true) } } - if(producerRequest.requiredAcks != 0) - ProducerResponse.readFrom(response.payload) + if(producerRequest.requiredAcks != 0) { + val producerResponse = ProducerResponse.readFrom(response.payload) + producerRequestStats.getProducerRequestStats(config.host, config.port).throttleTimeHist.update(producerResponse.throttleTime) + producerRequestStats.getProducerRequestAllBrokersStats.throttleTimeHist.update(producerResponse.throttleTime) + producerResponse + } else null } -- 1.7.9.5