Description
The broker topic metric FetchMessageConversionsPerSec doesn't get recorded on a fetch message conversion.
The bug is that we pass in a callback that expects a MultiRecordsSend in KafkaApis:
def updateConversionStats(send: Send): Unit = { send match { case send: MultiRecordsSend if send.recordConversionStats != null => send.recordConversionStats.asScala.toMap.foreach { case (tp, stats) => updateRecordConversionStats(request, tp, stats) } case _ => } }
But we call this callback with a NetworkSend in the SocketServer:
selector.completedSends.forEach { send => try { val response = inflightResponses.remove(send.destinationId).getOrElse { throw new IllegalStateException(s"Send for ${send.destinationId} completed, but not in `inflightResponses`") } updateRequestMetrics(response) // Invoke send completion callback response.onComplete.foreach(onComplete => onComplete(send)) ...
Note that Selector.completedSends returns a collection of NetworkSend
Attachments
Issue Links
- is related to
-
KAFKA-14743 MessageConversionsTimeMs for fetch request metric is not updated
- Resolved
- links to