Description
KafkaApis.updateRecordConversionStats may be called multiple times for a request, and request.messageConversionsTimeNanos and request.temporaryMemoryBytes are overwritten in this method.
private def updateRecordConversionStats(request: RequestChannel.Request,
tp: TopicPartition,
conversionStats: RecordValidationStats): Unit = {
...
request.messageConversionsTimeNanos = conversionStats.conversionTimeNanos
}
request.temporaryMemoryBytes = conversionStats.temporaryMemoryBytes
}
def processingStatsCallback(processingStats: FetchResponseStats): Unit = { processingStats.forKeyValue { (tp, info) => updateRecordConversionStats(request, tp, info) } }
So, in this case, MessageConversionsTimeMs and TemporaryMemoryBytes are not recorded correctly.
Attachments
Issue Links
- links to