Index: core/src/main/scala/kafka/server/KafkaRequestHandlers.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (revision 1293014) +++ core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (working copy) @@ -75,6 +75,8 @@ catch { case e => error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e) + BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest + BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest throw e } } @@ -113,6 +115,8 @@ catch { case e => error("error when processing request " + fetchRequest, e) + BrokerTopicStat.getBrokerTopicStat(fetchRequest.topic).recordFailedFetchRequest + BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest response=new MessageSetSend(MessageSet.Empty, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) } response @@ -132,6 +136,8 @@ def getMessagesIn: Long def getBytesIn: Long def getBytesOut: Long + def getFailedProduceRequest: Long + def getFailedFetchRequest: Long } @threadsafe @@ -139,6 +145,8 @@ private val numCumulatedMessagesIn = new AtomicLong(0) private val numCumulatedBytesIn = new AtomicLong(0) private val numCumulatedBytesOut = new AtomicLong(0) + private val numCumulatedFailedProduceRequests = new AtomicLong(0) + private val numCumulatedFailedFetchRequests = new AtomicLong(0) def getMessagesIn: Long = numCumulatedMessagesIn.get @@ -151,6 +159,14 @@ def getBytesOut: Long = numCumulatedBytesOut.get def recordBytesOut(nBytes: Long) = numCumulatedBytesOut.getAndAdd(nBytes) + + def recordFailedProduceRequest = numCumulatedFailedProduceRequests.getAndIncrement + + def getFailedProduceRequest = numCumulatedFailedProduceRequests.get() + + def recordFailedFetchRequest = numCumulatedFailedFetchRequests.getAndIncrement + + def getFailedFetchRequest = numCumulatedFailedFetchRequests.get() } object BrokerTopicStat extends Logging {