From 7e748d69ebdd2b5c9aff9a449358aac1495d597c Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Mon, 3 Aug 2015 18:22:17 -0700 Subject: [PATCH] KAFKA-2332; Add quota metrics to old producer and consumer --- .../consumer/FetchRequestAndResponseStats.scala | 1 + .../main/scala/kafka/consumer/SimpleConsumer.scala | 2 ++ .../kafka/producer/ProducerRequestStats.scala | 1 + .../main/scala/kafka/producer/SyncProducer.scala | 8 ++++++-- 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala index 3df55e1..fb76e32 100644 --- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala +++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala @@ -34,6 +34,7 @@ class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetr val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags)) val requestSizeHist = newHistogram("FetchResponseSize", biased = true, tags) + val throttleTimeHist = newHistogram("FetchRequestThrottleTimeMs", biased = true, tags) } /** diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index 7ebc040..b031da1 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -135,6 +135,8 @@ class SimpleConsumer(val host: String, val fetchedSize = fetchResponse.sizeInBytes fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize) fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize) + fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).throttleTimeHist.update(fetchResponse.throttleTimeMs) + fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.throttleTimeHist.update(fetchResponse.throttleTimeMs) fetchResponse } diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala index 026e93a..84331ac 100644 --- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala @@ -29,6 +29,7 @@ class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup val requestTimer = new KafkaTimer(newTimer("ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags)) val requestSizeHist = newHistogram("ProducerRequestSize", biased = true, tags) + val throttleTimeHist = newHistogram("ProducerRequestThrottleTimeMs", biased = true, tags) } /** diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index dcee501..7245390 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -104,8 +104,12 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { response = doSend(producerRequest, if(producerRequest.requiredAcks == 0) false else true) } } - if(producerRequest.requiredAcks != 0) - ProducerResponse.readFrom(response.payload) + if(producerRequest.requiredAcks != 0) { + val producerResponse = ProducerResponse.readFrom(response.payload) + producerRequestStats.getProducerRequestStats(config.host, config.port).throttleTimeHist.update(producerResponse.throttleTime) + producerRequestStats.getProducerRequestAllBrokersStats.throttleTimeHist.update(producerResponse.throttleTime) + producerResponse + } else null } -- 1.7.9.5