Index: core/src/main/scala/kafka/network/RequestChannel.scala =================================================================== --- core/src/main/scala/kafka/network/RequestChannel.scala (revision 1401907) +++ core/src/main/scala/kafka/network/RequestChannel.scala (working copy) @@ -40,9 +40,9 @@ } case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long) { - var dequeueTimeMs = -1L - var apiLocalCompleteTimeMs = -1L - var responseCompleteTimeMs = -1L + @volatile var dequeueTimeMs = -1L + @volatile var apiLocalCompleteTimeMs = -1L + @volatile var responseCompleteTimeMs = -1L val requestId = buffer.getShort() val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer) buffer.rewind() @@ -50,6 +50,10 @@ def updateRequestMetrics() { val endTimeMs = SystemTime.milliseconds + // In some corner cases, apiLocalCompleteTimeMs may not be set when the request completes since the remote + // processing time is really small. In this case, use responseCompleteTimeMs as apiLocalCompleteTimeMs. + if (apiLocalCompleteTimeMs < 0) + apiLocalCompleteTimeMs = responseCompleteTimeMs val queueTime = (dequeueTimeMs - startTimeMs).max(0L) val apiLocalTime = (apiLocalCompleteTimeMs - dequeueTimeMs).max(0L) val apiRemoteTime = (responseCompleteTimeMs - apiLocalCompleteTimeMs).max(0L) @@ -71,8 +75,9 @@ m.responseSendTimeHist.update(responseSendTime) m.totalTimeHist.update(totalTime) } + info("Completed request: %s totalTime:%d queueTime:%d localTime:%d remoteTime:%d sendTime:%d" + .format(requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime)) } - trace("Completed request: %s".format(requestObj)) } case class Response(processor: Int, request: Request, responseSend: Send) {