From 9a313510b0b74390f2ed03c7e23addc1c1fc7c47 Mon Sep 17 00:00:00 2001
From: Dong Lin <lindong@cis.upenn.edu>
Date: Thu, 23 Jul 2015 00:05:49 -0700
Subject: [PATCH] KAFKA-2332; Add quota metrics to old producer and consumer

---
 .../consumer/FetchRequestAndResponseStats.scala    |    2 ++
 .../main/scala/kafka/consumer/SimpleConsumer.scala |    2 ++
 .../kafka/producer/ProducerRequestStats.scala      |    1 +
 .../main/scala/kafka/producer/SyncProducer.scala   |    8 ++++++--
 4 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
index 3df55e1..7ff5fe9 100644
--- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
+++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
 import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.utils.Pool
+import org.apache.kafka.common.metrics.stats.Histogram
 
 class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
   val tags = metricId match {
@@ -34,6 +35,7 @@ class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetr
 
   val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags))
   val requestSizeHist = newHistogram("FetchResponseSize", biased = true, tags)
+  val throttleTimeHist = newHistogram("FetchRequestThrottleTimeMs", biased = true, tags)
 }
 
 /**
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 4e1833a..873aeff 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -135,6 +135,8 @@ class SimpleConsumer(val host: String,
     val fetchedSize = fetchResponse.sizeInBytes
     fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize)
     fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize)
+    fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).throttleTimeHist.update(fetchResponse.throttleTimeMs)
+    fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.throttleTimeHist.update(fetchResponse.throttleTimeMs)
     fetchResponse
   }
 
diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
index 026e93a..84331ac 100644
--- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
+++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala
@@ -29,6 +29,7 @@ class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup
 
   val requestTimer = new KafkaTimer(newTimer("ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags))
   val requestSizeHist = newHistogram("ProducerRequestSize", biased = true, tags)
+  val throttleTimeHist = newHistogram("ProducerRequestThrottleTimeMs", biased = true, tags)
 }
 
 /**
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index dcee501..7245390 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -104,8 +104,12 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
         response = doSend(producerRequest, if(producerRequest.requiredAcks == 0) false else true)
       }
     }
-    if(producerRequest.requiredAcks != 0)
-      ProducerResponse.readFrom(response.payload)
+    if(producerRequest.requiredAcks != 0) {
+      val producerResponse = ProducerResponse.readFrom(response.payload)
+      producerRequestStats.getProducerRequestStats(config.host, config.port).throttleTimeHist.update(producerResponse.throttleTime)
+      producerRequestStats.getProducerRequestAllBrokersStats.throttleTimeHist.update(producerResponse.throttleTime)
+      producerResponse
+    }
     else
       null
   }
-- 
1.7.9.5

