diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 77d7ec0..0fd1c16 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -45,6 +45,7 @@ object RequestChannel extends Logging { @volatile var dequeueTimeMs = -1L @volatile var apiLocalCompleteTimeMs = -1L @volatile var responseCompleteTimeMs = -1L + @volatile var responseDequeueTimeMs = -1L val requestId = buffer.getShort() val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) buffer = null @@ -60,7 +61,8 @@ object RequestChannel extends Logging { val queueTime = (dequeueTimeMs - startTimeMs).max(0L) val apiLocalTime = (apiLocalCompleteTimeMs - dequeueTimeMs).max(0L) val apiRemoteTime = (responseCompleteTimeMs - apiLocalCompleteTimeMs).max(0L) - val responseSendTime = (endTimeMs - responseCompleteTimeMs).max(0L) + val responseQueueTime = (responseDequeueTimeMs - responseCompleteTimeMs).max(0L) + val responseSendTime = (endTimeMs - responseDequeueTimeMs).max(0L) val totalTime = endTimeMs - startTimeMs var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(requestId))) if (requestId == RequestKeys.FetchKey) { @@ -75,12 +77,13 @@ object RequestChannel extends Logging { m.queueTimeHist.update(queueTime) m.localTimeHist.update(apiLocalTime) m.remoteTimeHist.update(apiRemoteTime) + m.responseQueueTimeHist.update(responseQueueTime) m.responseSendTimeHist.update(responseSendTime) m.totalTimeHist.update(totalTime) } if(requestLogger.isTraceEnabled) - requestLogger.trace("Completed request:%s from client %s;totalTime:%d,queueTime:%d,localTime:%d,remoteTime:%d,sendTime:%d" - .format(requestObj, remoteAddress, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime)) + requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" + .format(requestObj, remoteAddress, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) } } @@ -182,6 +185,8 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { val localTimeHist = newHistogram(name + "-LocalTimeMs") // time a request takes to wait on remote brokers (only relevant to fetch and produce requests) val remoteTimeHist = newHistogram(name + "-RemoteTimeMs") + // time a response spent in a response queue + val responseQueueTimeHist = newHistogram(name + "-ResponseQueueTimeMs") // time to send the response to the requester val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeMs") val totalTimeHist = newHistogram(name + "-TotalTimeMs") diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 216245d..9de032e 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -272,6 +272,7 @@ private[kafka] class Processor(val id: Int, private def processNewResponses() { var curr = requestChannel.receiveResponse(id) while(curr != null) { + curr.request.responseDequeueTimeMs = SystemTime.milliseconds val key = curr.request.requestKey.asInstanceOf[SelectionKey] try { curr.responseAction match {