Description
This issue was introduced in KAFKA-2929. The problem is that we are using o.a.k.common.protocol.Errors.forException() while some exceptions thrown by the broker are still using old scala exception. This cause Errors.forException() always return UnknownServerException.
InvalidMessageException is inherited from CorruptRecordException. But it seems Errors.forException() needs the exception class to be the exact class, so it does not map the subclass InvalidMessageException to the correct error code. Instead it returns -1 which is UnknownServerException.