Thanks for the patch. Some comments:
1. RequestOrResponse.handleError: We probably shouldn't couple the error response generation and the sending of the response through RequestChannel. It's probably better if we change handleError() to sth like generateErrorResponse() that only takes an error code and generates a response. The sending of the response will be done in KafkaApis. Similarly, I think the error logging should be done in KafkaApis.handle() too.
2. ProducerRequest: When printing out data, in addition to the key, it would be useful to print out messageSet.sizeInBytes().
3.1 Doesn't compile.
3.2 There are statements like the following in the handling of each type of request. Could we move them to handle() and remove the special printing for versionId, correlationId, and clientId?
requestLogger.trace("Handling LeaderAndIsrRequest v%d with correlation id %d from client %s: %s"
.format(leaderAndIsrRequest.versionId, leaderAndIsrRequest.correlationId, leaderAndIsrRequest.clientId, leaderAndIsrRequest.toString))
4. RequestChannel.Request: Could you remove versionId, correlationId, clientId in the trace statement in updateRequestMetrics() and remove the corresponding deserialization logic?